Skip to content

Commit

Permalink
Merge pull request #110 from arangodb/safe-resource-watcher
Browse files Browse the repository at this point in the history
Safe resource watcher
  • Loading branch information
ewoutp authored Apr 5, 2018
2 parents 92416ac + c08dc0b commit e7a508f
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 175 deletions.
205 changes: 103 additions & 102 deletions pkg/deployment/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@ package deployment

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
fields.Everything())

getPod := func(obj interface{}) (*v1.Pod, bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -49,35 +44,35 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
return pod, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
&v1.Pod{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
fields.Everything())

getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
Expand All @@ -91,35 +86,35 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
return pvc, true
}

_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
&v1.PersistentVolumeClaim{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForSecretEvents keep listening for changes in Secrets's until the given channel is closed.
func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"secrets",
d.apiObject.GetNamespace(),
fields.Everything())

getSecret := func(obj interface{}) (*v1.Secret, bool) {
secret, ok := obj.(*v1.Secret)
if !ok {
Expand All @@ -133,36 +128,36 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
return secret, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Secret{}, 0, cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if _, ok := getSecret(newObj); ok {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"secrets",
d.apiObject.GetNamespace(),
&v1.Secret{},
cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if _, ok := getSecret(newObj); ok {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
fields.Everything())

getService := func(obj interface{}) (*v1.Service, bool) {
service, ok := obj.(*v1.Service)
if !ok {
Expand All @@ -176,23 +171,29 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
return service, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
&v1.Service{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}
20 changes: 10 additions & 10 deletions pkg/operator/operator_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

var (
Expand All @@ -46,20 +46,20 @@ var (
// run the deployments part of the operator.
// This registers a listener and waits until the process stops.
func (o *Operator) runDeployments(stop <-chan struct{}) {
source := cache.NewListWatchFromClient(
rw := k8sutil.NewResourceWatcher(
o.log,
o.Dependencies.CRCli.DatabaseV1alpha().RESTClient(),
api.ArangoDeploymentResourcePlural,
o.Config.Namespace,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.ArangoDeployment{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoDeployment,
UpdateFunc: o.onUpdateArangoDeployment,
DeleteFunc: o.onDeleteArangoDeployment,
}, cache.Indexers{})
&api.ArangoDeployment{},
cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoDeployment,
UpdateFunc: o.onUpdateArangoDeployment,
DeleteFunc: o.onDeleteArangoDeployment,
})

o.Dependencies.DeploymentProbe.SetReady()
informer.Run(stop)
rw.Run(stop)
}

// onAddArangoDeployment deployment addition callback
Expand Down
20 changes: 10 additions & 10 deletions pkg/operator/operator_local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/storage"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

var (
Expand All @@ -46,20 +46,20 @@ var (
// run the local storages part of the operator.
// This registers a listener and waits until the process stops.
func (o *Operator) runLocalStorages(stop <-chan struct{}) {
source := cache.NewListWatchFromClient(
rw := k8sutil.NewResourceWatcher(
o.log,
o.Dependencies.CRCli.StorageV1alpha().RESTClient(),
api.ArangoLocalStorageResourcePlural,
"", //o.Config.Namespace,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.ArangoLocalStorage{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoLocalStorage,
UpdateFunc: o.onUpdateArangoLocalStorage,
DeleteFunc: o.onDeleteArangoLocalStorage,
}, cache.Indexers{})
&api.ArangoLocalStorage{},
cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoLocalStorage,
UpdateFunc: o.onUpdateArangoLocalStorage,
DeleteFunc: o.onDeleteArangoLocalStorage,
})

o.Dependencies.StorageProbe.SetReady()
informer.Run(stop)
rw.Run(stop)
}

// onAddArangoLocalStorage local storage addition callback
Expand Down
Loading

0 comments on commit e7a508f

Please sign in to comment.