Skip to content

Commit

Permalink
Merge pull request #74 from arangodb/watch-more-resources
Browse files Browse the repository at this point in the history
Also watch changes in PVCs and Services
  • Loading branch information
ewoutp authored Mar 26, 2018
2 parents e0329ea + fba9d33 commit 284b2e2
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 115 deletions.
27 changes: 13 additions & 14 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ type deploymentEventType string

const (
eventArangoDeploymentUpdated deploymentEventType = "ArangoDeploymentUpdated"
eventPodAdded deploymentEventType = "PodAdded"
eventPodUpdated deploymentEventType = "PodUpdated"
eventPodDeleted deploymentEventType = "PodDeleted"
)

// deploymentEvent holds an event passed from the controller to the deployment.
type deploymentEvent struct {
Type deploymentEventType
Deployment *api.ArangoDeployment
Pod *v1.Pod
}

const (
Expand All @@ -91,6 +87,7 @@ type Deployment struct {
eventsCli corev1.EventInterface

inspectTrigger trigger.Trigger
updateDeploymentTrigger trigger.Trigger
clientCache *clientCache
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
Expand Down Expand Up @@ -121,7 +118,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
}

go d.run()
go d.listenForPodEvents()
go d.listenForPodEvents(d.stopCh)
go d.listenForPVCEvents(d.stopCh)
go d.listenForServiceEvents(d.stopCh)
if apiObject.Spec.GetMode() == api.DeploymentModeCluster {
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
Expand Down Expand Up @@ -217,20 +216,20 @@ func (d *Deployment) run() {
// Got event from event queue
switch event.Type {
case eventArangoDeploymentUpdated:
if err := d.handleArangoDeploymentUpdatedEvent(event); err != nil {
d.failOnError(err, "Failed to handle deployment update")
return
}
case eventPodAdded, eventPodUpdated, eventPodDeleted:
// Pod event received, let's inspect soon
d.inspectTrigger.Trigger()
d.updateDeploymentTrigger.Trigger()
default:
panic("unknown event type" + event.Type)
}

case <-d.inspectTrigger.Done():
inspectionInterval = d.inspectDeployment(inspectionInterval)

case <-d.updateDeploymentTrigger.Done():
if err := d.handleArangoDeploymentUpdatedEvent(); err != nil {
d.failOnError(err, "Failed to handle deployment update")
return
}

case <-time.After(inspectionInterval):
// Trigger inspection
d.inspectTrigger.Trigger()
Expand All @@ -244,8 +243,8 @@ func (d *Deployment) run() {
}

// handleArangoDeploymentUpdatedEvent is called when the deployment is updated by the user.
func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) error {
log := d.deps.Log.With().Str("deployment", event.Deployment.GetName()).Logger()
func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
log := d.deps.Log.With().Str("deployment", d.apiObject.GetName()).Logger()

// Get the most recent version of the deployment from the API server
current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(d.apiObject.GetName(), metav1.GetOptions{})
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}
return nextInterval
}

// triggerInspection ensures that an inspection is run soon.
func (d *Deployment) triggerInspection() {
d.inspectTrigger.Trigger()
}
155 changes: 155 additions & 0 deletions pkg/deployment/informers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)

// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
fields.Everything())

getPod := func(obj interface{}) (*v1.Pod, bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pod, ok = tombstone.Obj.(*v1.Pod)
return pod, ok
}
return pod, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}

// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
fields.Everything())

getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim)
return pvc, ok
}
return pvc, true
}

_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}

// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
fields.Everything())

getService := func(obj interface{}) (*v1.Service, bool) {
service, ok := obj.(*v1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
service, ok = tombstone.Obj.(*v1.Service)
return service, ok
}
return service, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}
80 changes: 0 additions & 80 deletions pkg/deployment/pod_informer.go

This file was deleted.

18 changes: 13 additions & 5 deletions pkg/deployment/resources/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,23 @@ func (r *Resources) EnsureServices() error {
owner := apiObject.AsOwner()
spec := r.context.GetSpec()

log.Debug().Msg("creating services...")

if _, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner); err != nil {
svcName, newlyCreated, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create headless service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created headless service")
}
single := spec.GetMode().HasSingleServers()
svcName, err := k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner)
svcName, newlyCreated, err = k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created database client service")
}
status := r.context.GetStatus()
if status.ServiceName != svcName {
status.ServiceName = svcName
Expand All @@ -55,11 +60,14 @@ func (r *Resources) EnsureServices() error {
}

if spec.Sync.IsEnabled() {
svcName, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner)
svcName, newlyCreated, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create syncmaster client service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created syncmasters service")
}
status := r.context.GetStatus()
if status.SyncServiceName != svcName {
status.SyncServiceName = svcName
Expand Down
Loading

0 comments on commit 284b2e2

Please sign in to comment.