Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe resource watcher #110

Merged
merged 2 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 103 additions & 102 deletions pkg/deployment/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@ package deployment

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
fields.Everything())

getPod := func(obj interface{}) (*v1.Pod, bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -49,35 +44,35 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
return pod, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
&v1.Pod{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
fields.Everything())

getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
Expand All @@ -91,35 +86,35 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
return pvc, true
}

_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
&v1.PersistentVolumeClaim{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForSecretEvents keep listening for changes in Secrets's until the given channel is closed.
func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"secrets",
d.apiObject.GetNamespace(),
fields.Everything())

getSecret := func(obj interface{}) (*v1.Secret, bool) {
secret, ok := obj.(*v1.Secret)
if !ok {
Expand All @@ -133,36 +128,36 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
return secret, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Secret{}, 0, cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if _, ok := getSecret(newObj); ok {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"secrets",
d.apiObject.GetNamespace(),
&v1.Secret{},
cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if _, ok := getSecret(newObj); ok {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if _, ok := getSecret(obj); ok {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}

// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
fields.Everything())

getService := func(obj interface{}) (*v1.Service, bool) {
service, ok := obj.(*v1.Service)
if !ok {
Expand All @@ -176,23 +171,29 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
return service, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
rw := k8sutil.NewResourceWatcher(
d.deps.Log,
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
&v1.Service{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
})

rw.Run(stopCh)
}
20 changes: 10 additions & 10 deletions pkg/operator/operator_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

var (
Expand All @@ -46,20 +46,20 @@ var (
// run the deployments part of the operator.
// This registers a listener and waits until the process stops.
func (o *Operator) runDeployments(stop <-chan struct{}) {
source := cache.NewListWatchFromClient(
rw := k8sutil.NewResourceWatcher(
o.log,
o.Dependencies.CRCli.DatabaseV1alpha().RESTClient(),
api.ArangoDeploymentResourcePlural,
o.Config.Namespace,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.ArangoDeployment{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoDeployment,
UpdateFunc: o.onUpdateArangoDeployment,
DeleteFunc: o.onDeleteArangoDeployment,
}, cache.Indexers{})
&api.ArangoDeployment{},
cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoDeployment,
UpdateFunc: o.onUpdateArangoDeployment,
DeleteFunc: o.onDeleteArangoDeployment,
})

o.Dependencies.DeploymentProbe.SetReady()
informer.Run(stop)
rw.Run(stop)
}

// onAddArangoDeployment deployment addition callback
Expand Down
20 changes: 10 additions & 10 deletions pkg/operator/operator_local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/storage"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

var (
Expand All @@ -46,20 +46,20 @@ var (
// run the local storages part of the operator.
// This registers a listener and waits until the process stops.
func (o *Operator) runLocalStorages(stop <-chan struct{}) {
source := cache.NewListWatchFromClient(
rw := k8sutil.NewResourceWatcher(
o.log,
o.Dependencies.CRCli.StorageV1alpha().RESTClient(),
api.ArangoLocalStorageResourcePlural,
"", //o.Config.Namespace,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.ArangoLocalStorage{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoLocalStorage,
UpdateFunc: o.onUpdateArangoLocalStorage,
DeleteFunc: o.onDeleteArangoLocalStorage,
}, cache.Indexers{})
&api.ArangoLocalStorage{},
cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoLocalStorage,
UpdateFunc: o.onUpdateArangoLocalStorage,
DeleteFunc: o.onDeleteArangoLocalStorage,
})

o.Dependencies.StorageProbe.SetReady()
informer.Run(stop)
rw.Run(stop)
}

// onAddArangoLocalStorage local storage addition callback
Expand Down
Loading