Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also watch changes in PVCs and Services #74

Merged
merged 4 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ type deploymentEventType string

const (
eventArangoDeploymentUpdated deploymentEventType = "ArangoDeploymentUpdated"
eventPodAdded deploymentEventType = "PodAdded"
eventPodUpdated deploymentEventType = "PodUpdated"
eventPodDeleted deploymentEventType = "PodDeleted"
)

// deploymentEvent holds an event passed from the controller to the deployment.
type deploymentEvent struct {
Type deploymentEventType
Deployment *api.ArangoDeployment
Pod *v1.Pod
}

const (
Expand All @@ -91,6 +87,7 @@ type Deployment struct {
eventsCli corev1.EventInterface

inspectTrigger trigger.Trigger
updateDeploymentTrigger trigger.Trigger
clientCache *clientCache
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
Expand Down Expand Up @@ -121,7 +118,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
}

go d.run()
go d.listenForPodEvents()
go d.listenForPodEvents(d.stopCh)
go d.listenForPVCEvents(d.stopCh)
go d.listenForServiceEvents(d.stopCh)
if apiObject.Spec.GetMode() == api.DeploymentModeCluster {
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
Expand Down Expand Up @@ -217,20 +216,20 @@ func (d *Deployment) run() {
// Got event from event queue
switch event.Type {
case eventArangoDeploymentUpdated:
if err := d.handleArangoDeploymentUpdatedEvent(event); err != nil {
d.failOnError(err, "Failed to handle deployment update")
return
}
case eventPodAdded, eventPodUpdated, eventPodDeleted:
// Pod event received, let's inspect soon
d.inspectTrigger.Trigger()
d.updateDeploymentTrigger.Trigger()
default:
panic("unknown event type" + event.Type)
}

case <-d.inspectTrigger.Done():
inspectionInterval = d.inspectDeployment(inspectionInterval)

case <-d.updateDeploymentTrigger.Done():
if err := d.handleArangoDeploymentUpdatedEvent(); err != nil {
d.failOnError(err, "Failed to handle deployment update")
return
}

case <-time.After(inspectionInterval):
// Trigger inspection
d.inspectTrigger.Trigger()
Expand All @@ -244,8 +243,8 @@ func (d *Deployment) run() {
}

// handleArangoDeploymentUpdatedEvent is called when the deployment is updated by the user.
func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) error {
log := d.deps.Log.With().Str("deployment", event.Deployment.GetName()).Logger()
func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
log := d.deps.Log.With().Str("deployment", d.apiObject.GetName()).Logger()

// Get the most recent version of the deployment from the API server
current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(d.apiObject.GetName(), metav1.GetOptions{})
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}
return nextInterval
}

// triggerInspection ensures that an inspection is run soon.
func (d *Deployment) triggerInspection() {
d.inspectTrigger.Trigger()
}
155 changes: 155 additions & 0 deletions pkg/deployment/informers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)

// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
fields.Everything())

getPod := func(obj interface{}) (*v1.Pod, bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pod, ok = tombstone.Obj.(*v1.Pod)
return pod, ok
}
return pod, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}

// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
d.apiObject.GetNamespace(),
fields.Everything())

getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim)
return pvc, ok
}
return pvc, true
}

_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}

// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"services",
d.apiObject.GetNamespace(),
fields.Everything())

getService := func(obj interface{}) (*v1.Service, bool) {
service, ok := obj.(*v1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
service, ok = tombstone.Obj.(*v1.Service)
return service, ok
}
return service, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
}, cache.Indexers{})

informer.Run(stopCh)
}
80 changes: 0 additions & 80 deletions pkg/deployment/pod_informer.go

This file was deleted.

18 changes: 13 additions & 5 deletions pkg/deployment/resources/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,23 @@ func (r *Resources) EnsureServices() error {
owner := apiObject.AsOwner()
spec := r.context.GetSpec()

log.Debug().Msg("creating services...")

if _, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner); err != nil {
svcName, newlyCreated, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create headless service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created headless service")
}
single := spec.GetMode().HasSingleServers()
svcName, err := k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner)
svcName, newlyCreated, err = k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created database client service")
}
status := r.context.GetStatus()
if status.ServiceName != svcName {
status.ServiceName = svcName
Expand All @@ -55,11 +60,14 @@ func (r *Resources) EnsureServices() error {
}

if spec.Sync.IsEnabled() {
svcName, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner)
svcName, newlyCreated, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create syncmaster client service")
return maskAny(err)
}
if newlyCreated {
log.Debug().Str("service", svcName).Msg("Created syncmasters service")
}
status := r.context.GetStatus()
if status.SyncServiceName != svcName {
status.SyncServiceName = svcName
Expand Down
Loading