diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 7a9c723f5..25b80f2e5 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -60,16 +60,12 @@ type deploymentEventType string const ( eventArangoDeploymentUpdated deploymentEventType = "ArangoDeploymentUpdated" - eventPodAdded deploymentEventType = "PodAdded" - eventPodUpdated deploymentEventType = "PodUpdated" - eventPodDeleted deploymentEventType = "PodDeleted" ) // deploymentEvent holds an event passed from the controller to the deployment. type deploymentEvent struct { Type deploymentEventType Deployment *api.ArangoDeployment - Pod *v1.Pod } const ( @@ -91,6 +87,7 @@ type Deployment struct { eventsCli corev1.EventInterface inspectTrigger trigger.Trigger + updateDeploymentTrigger trigger.Trigger clientCache *clientCache recentInspectionErrors int clusterScalingIntegration *clusterScalingIntegration @@ -121,7 +118,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De } go d.run() - go d.listenForPodEvents() + go d.listenForPodEvents(d.stopCh) + go d.listenForPVCEvents(d.stopCh) + go d.listenForServiceEvents(d.stopCh) if apiObject.Spec.GetMode() == api.DeploymentModeCluster { ci := newClusterScalingIntegration(d) d.clusterScalingIntegration = ci @@ -217,13 +216,7 @@ func (d *Deployment) run() { // Got event from event queue switch event.Type { case eventArangoDeploymentUpdated: - if err := d.handleArangoDeploymentUpdatedEvent(event); err != nil { - d.failOnError(err, "Failed to handle deployment update") - return - } - case eventPodAdded, eventPodUpdated, eventPodDeleted: - // Pod event received, let's inspect soon - d.inspectTrigger.Trigger() + d.updateDeploymentTrigger.Trigger() default: panic("unknown event type" + event.Type) } @@ -231,6 +224,12 @@ func (d *Deployment) run() { case <-d.inspectTrigger.Done(): inspectionInterval = d.inspectDeployment(inspectionInterval) + case <-d.updateDeploymentTrigger.Done(): + if err := d.handleArangoDeploymentUpdatedEvent(); err != nil { + d.failOnError(err, "Failed to handle deployment update") + return + } + case <-time.After(inspectionInterval): // Trigger inspection d.inspectTrigger.Trigger() @@ -244,8 +243,8 @@ func (d *Deployment) run() { } // handleArangoDeploymentUpdatedEvent is called when the deployment is updated by the user. -func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) error { - log := d.deps.Log.With().Str("deployment", event.Deployment.GetName()).Logger() +func (d *Deployment) handleArangoDeploymentUpdatedEvent() error { + log := d.deps.Log.With().Str("deployment", d.apiObject.GetName()).Logger() // Get the most recent version of the deployment from the API server current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(d.apiObject.GetName(), metav1.GetOptions{}) diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index 8f629d4fd..b3311facf 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -101,3 +101,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration } return nextInterval } + +// triggerInspection ensures that an inspection is run soon. +func (d *Deployment) triggerInspection() { + d.inspectTrigger.Trigger() +} diff --git a/pkg/deployment/informers.go b/pkg/deployment/informers.go new file mode 100644 index 000000000..2cc34ab63 --- /dev/null +++ b/pkg/deployment/informers.go @@ -0,0 +1,155 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/tools/cache" +) + +// listenForPodEvents keep listening for changes in pod until the given channel is closed. +func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) { + source := cache.NewListWatchFromClient( + d.deps.KubeCli.CoreV1().RESTClient(), + "pods", + d.apiObject.GetNamespace(), + fields.Everything()) + + getPod := func(obj interface{}) (*v1.Pod, bool) { + pod, ok := obj.(*v1.Pod) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, false + } + pod, ok = tombstone.Obj.(*v1.Pod) + return pod, ok + } + return pod, true + } + + _, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if p, ok := getPod(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if p, ok := getPod(newObj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if p, ok := getPod(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + }, cache.Indexers{}) + + informer.Run(stopCh) +} + +// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed. +func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) { + source := cache.NewListWatchFromClient( + d.deps.KubeCli.CoreV1().RESTClient(), + "persistentvolumeclaims", + d.apiObject.GetNamespace(), + fields.Everything()) + + getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, false + } + pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim) + return pvc, ok + } + return pvc, true + } + + _, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if p, ok := getPVC(obj); ok && d.isOwnerOf(p) { + d.triggerInspection() + } + }, + }, cache.Indexers{}) + + informer.Run(stopCh) +} + +// listenForServiceEvents keep listening for changes in Service's until the given channel is closed. +func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) { + source := cache.NewListWatchFromClient( + d.deps.KubeCli.CoreV1().RESTClient(), + "services", + d.apiObject.GetNamespace(), + fields.Everything()) + + getService := func(obj interface{}) (*v1.Service, bool) { + service, ok := obj.(*v1.Service) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, false + } + service, ok = tombstone.Obj.(*v1.Service) + return service, ok + } + return service, true + } + + _, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if s, ok := getService(obj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if s, ok := getService(newObj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + DeleteFunc: func(obj interface{}) { + if s, ok := getService(obj); ok && d.isOwnerOf(s) { + d.triggerInspection() + } + }, + }, cache.Indexers{}) + + informer.Run(stopCh) +} diff --git a/pkg/deployment/pod_informer.go b/pkg/deployment/pod_informer.go deleted file mode 100644 index ddcf8f3bd..000000000 --- a/pkg/deployment/pod_informer.go +++ /dev/null @@ -1,80 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2018 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// -// Author Ewout Prangsma -// - -package deployment - -import ( - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/tools/cache" -) - -// listenForPodEvents keep listening for changes in pod until the given channel is closed. -func (d *Deployment) listenForPodEvents() { - source := cache.NewListWatchFromClient( - d.deps.KubeCli.CoreV1().RESTClient(), - "pods", - d.apiObject.GetNamespace(), - fields.Everything()) - - getPod := func(obj interface{}) (*v1.Pod, bool) { - pod, ok := obj.(*v1.Pod) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, false - } - pod, ok = tombstone.Obj.(*v1.Pod) - return pod, ok - } - return pod, true - } - - _, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if p, ok := getPod(obj); ok && d.isOwnerOf(p) { - d.send(&deploymentEvent{ - Type: eventPodAdded, - Pod: p, - }) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if p, ok := getPod(newObj); ok && d.isOwnerOf(p) { - d.send(&deploymentEvent{ - Type: eventPodUpdated, - Pod: p, - }) - } - }, - DeleteFunc: func(obj interface{}) { - if p, ok := getPod(obj); ok && d.isOwnerOf(p) { - d.send(&deploymentEvent{ - Type: eventPodDeleted, - Pod: p, - }) - } - }, - }, cache.Indexers{}) - - informer.Run(d.stopCh) -} diff --git a/pkg/deployment/resources/services.go b/pkg/deployment/resources/services.go index 5831a5e0f..49ec7558f 100644 --- a/pkg/deployment/resources/services.go +++ b/pkg/deployment/resources/services.go @@ -34,18 +34,23 @@ func (r *Resources) EnsureServices() error { owner := apiObject.AsOwner() spec := r.context.GetSpec() - log.Debug().Msg("creating services...") - - if _, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner); err != nil { + svcName, newlyCreated, err := k8sutil.CreateHeadlessService(kubecli, apiObject, owner) + if err != nil { log.Debug().Err(err).Msg("Failed to create headless service") return maskAny(err) } + if newlyCreated { + log.Debug().Str("service", svcName).Msg("Created headless service") + } single := spec.GetMode().HasSingleServers() - svcName, err := k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner) + svcName, newlyCreated, err = k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner) if err != nil { log.Debug().Err(err).Msg("Failed to create database client service") return maskAny(err) } + if newlyCreated { + log.Debug().Str("service", svcName).Msg("Created database client service") + } status := r.context.GetStatus() if status.ServiceName != svcName { status.ServiceName = svcName @@ -55,11 +60,14 @@ func (r *Resources) EnsureServices() error { } if spec.Sync.IsEnabled() { - svcName, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner) + svcName, newlyCreated, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner) if err != nil { log.Debug().Err(err).Msg("Failed to create syncmaster client service") return maskAny(err) } + if newlyCreated { + log.Debug().Str("service", svcName).Msg("Created syncmasters service") + } status := r.context.GetStatus() if status.SyncServiceName != svcName { status.SyncServiceName = svcName diff --git a/pkg/util/k8sutil/services.go b/pkg/util/k8sutil/services.go index e881f3cc0..16cdeccc3 100644 --- a/pkg/util/k8sutil/services.go +++ b/pkg/util/k8sutil/services.go @@ -50,7 +50,8 @@ func CreateSyncMasterClientServiceName(deploymentName string) string { // DNS name for all pods. // If the service already exists, nil is returned. // If another error occurs, that error is returned. -func CreateHeadlessService(kubecli kubernetes.Interface, deployment metav1.Object, owner metav1.OwnerReference) (string, error) { +// The returned bool is true if the service is created, or false when the service already existed. +func CreateHeadlessService(kubecli kubernetes.Interface, deployment metav1.Object, owner metav1.OwnerReference) (string, bool, error) { deploymentName := deployment.GetName() svcName := CreateHeadlessServiceName(deploymentName) ports := []v1.ServicePort{ @@ -62,16 +63,18 @@ func CreateHeadlessService(kubecli kubernetes.Interface, deployment metav1.Objec } publishNotReadyAddresses := false sessionAffinity := v1.ServiceAffinityNone - if err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), ClusterIPNone, "", ports, publishNotReadyAddresses, sessionAffinity, owner); err != nil { - return "", maskAny(err) + newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), ClusterIPNone, "", ports, publishNotReadyAddresses, sessionAffinity, owner) + if err != nil { + return "", false, maskAny(err) } - return svcName, nil + return svcName, newlyCreated, nil } // CreateDatabaseClientService prepares and creates a service in k8s, used by database clients within the k8s cluster. // If the service already exists, nil is returned. // If another error occurs, that error is returned. -func CreateDatabaseClientService(kubecli kubernetes.Interface, deployment metav1.Object, single bool, owner metav1.OwnerReference) (string, error) { +// The returned bool is true if the service is created, or false when the service already existed. +func CreateDatabaseClientService(kubecli kubernetes.Interface, deployment metav1.Object, single bool, owner metav1.OwnerReference) (string, bool, error) { deploymentName := deployment.GetName() svcName := CreateDatabaseClientServiceName(deploymentName) ports := []v1.ServicePort{ @@ -89,16 +92,18 @@ func CreateDatabaseClientService(kubecli kubernetes.Interface, deployment metav1 } publishNotReadyAddresses := true sessionAffinity := v1.ServiceAffinityClientIP - if err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", role, ports, publishNotReadyAddresses, sessionAffinity, owner); err != nil { - return "", maskAny(err) + newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", role, ports, publishNotReadyAddresses, sessionAffinity, owner) + if err != nil { + return "", false, maskAny(err) } - return svcName, nil + return svcName, newlyCreated, nil } // CreateSyncMasterClientService prepares and creates a service in k8s, used by syncmaster clients within the k8s cluster. // If the service already exists, nil is returned. // If another error occurs, that error is returned. -func CreateSyncMasterClientService(kubecli kubernetes.Interface, deployment metav1.Object, owner metav1.OwnerReference) (string, error) { +// The returned bool is true if the service is created, or false when the service already existed. +func CreateSyncMasterClientService(kubecli kubernetes.Interface, deployment metav1.Object, owner metav1.OwnerReference) (string, bool, error) { deploymentName := deployment.GetName() svcName := CreateSyncMasterClientServiceName(deploymentName) ports := []v1.ServicePort{ @@ -110,17 +115,19 @@ func CreateSyncMasterClientService(kubecli kubernetes.Interface, deployment meta } publishNotReadyAddresses := true sessionAffinity := v1.ServiceAffinityNone - if err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", "syncmaster", ports, publishNotReadyAddresses, sessionAffinity, owner); err != nil { - return "", maskAny(err) + newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", "syncmaster", ports, publishNotReadyAddresses, sessionAffinity, owner) + if err != nil { + return "", false, maskAny(err) } - return svcName, nil + return svcName, newlyCreated, nil } // createService prepares and creates a service in k8s. // If the service already exists, nil is returned. // If another error occurs, that error is returned. +// The returned bool is true if the service is created, or false when the service already existed. func createService(kubecli kubernetes.Interface, svcName, deploymentName, ns, clusterIP, role string, - ports []v1.ServicePort, publishNotReadyAddresses bool, sessionAffinity v1.ServiceAffinity, owner metav1.OwnerReference) error { + ports []v1.ServicePort, publishNotReadyAddresses bool, sessionAffinity v1.ServiceAffinity, owner metav1.OwnerReference) (bool, error) { labels := LabelsForDeployment(deploymentName, role) svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -142,8 +149,10 @@ func createService(kubecli kubernetes.Interface, svcName, deploymentName, ns, cl }, } addOwnerRefToObject(svc.GetObjectMeta(), &owner) - if _, err := kubecli.CoreV1().Services(ns).Create(svc); err != nil && !IsAlreadyExists(err) { - return maskAny(err) + if _, err := kubecli.CoreV1().Services(ns).Create(svc); IsAlreadyExists(err) { + return false, nil + } else if err != nil { + return false, maskAny(err) } - return nil + return true, nil }