From c914e4c7b5ffada0d5ecc128491e700fb285a029 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Tue, 26 Jun 2018 10:42:13 +0200 Subject: [PATCH 1/3] Report leader event in owner resource --- main.go | 8 ++- .../deployment-replication/rbac.yaml | 4 +- manifests/templates/deployment/rbac.yaml | 4 +- manifests/templates/storage/rbac.yaml | 3 + pkg/operator/operator_leader.go | 61 +++++++++++++++++- pkg/util/k8sutil/owner.go | 62 +++++++++++++++++++ 6 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 pkg/util/k8sutil/owner.go diff --git a/main.go b/main.go index d93bb5d62..f3896250b 100644 --- a/main.go +++ b/main.go @@ -36,8 +36,10 @@ import ( "github.com/rs/zerolog" "github.com/spf13/cobra" flag "github.com/spf13/pflag" + appsv1beta2 "k8s.io/api/apps/v1beta2" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -271,5 +273,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name log.Info().Msgf(format, args...) }) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)}) - return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) + combinedScheme := runtime.NewScheme() + scheme.AddToScheme(combinedScheme) + v1.AddToScheme(combinedScheme) + appsv1beta2.AddToScheme(combinedScheme) + return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name}) } diff --git a/manifests/templates/deployment-replication/rbac.yaml b/manifests/templates/deployment-replication/rbac.yaml index 90df2baf4..d32dc7d50 100644 --- a/manifests/templates/deployment-replication/rbac.yaml +++ b/manifests/templates/deployment-replication/rbac.yaml @@ -33,8 +33,8 @@ rules: resources: ["nodes"] verbs: ["get"] - apiGroups: ["apps"] - resources: ["deployments"] - verbs: ["*"] + resources: ["deployments", "replicasets"] + verbs: ["get"] --- diff --git a/manifests/templates/deployment/rbac.yaml b/manifests/templates/deployment/rbac.yaml index b9362d931..5ccb4e647 100644 --- a/manifests/templates/deployment/rbac.yaml +++ b/manifests/templates/deployment/rbac.yaml @@ -30,8 +30,8 @@ rules: resources: ["nodes"] verbs: ["get"] - apiGroups: ["apps"] - resources: ["deployments"] - verbs: ["*"] + resources: ["deployments", "replicasets"] + verbs: ["get"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list"] diff --git a/manifests/templates/storage/rbac.yaml b/manifests/templates/storage/rbac.yaml index 6f743d913..fdbc28f99 100644 --- a/manifests/templates/storage/rbac.yaml +++ b/manifests/templates/storage/rbac.yaml @@ -33,6 +33,9 @@ rules: - apiGroups: ["apps"] resources: ["daemonsets"] verbs: ["*"] +- apiGroups: ["apps"] + resources: ["deployments", "replicasets"] + verbs: ["get"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["*"] diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index 5d94c2058..dfa5c0d72 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -23,16 +23,34 @@ package operator import ( + "fmt" + "os" "time" + "github.com/rs/zerolog" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +// runLeaderElection performs a leader election on a lock with given name in +// the namespace that the operator is deployed in. +// When the leader election is won, the given callback is called. +// When the leader election is lost (even after it was won once), the process is killed. func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) { namespace := o.Config.Namespace kubecli := o.Dependencies.KubeCli log := o.log.With().Str("lock-name", lockName).Logger() + eventTarget := o.getLeaderElectionEventTarget(log) + recordEvent := func(reason, message string) { + if eventTarget != nil { + o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message) + } + } rl, err := resourcelock.New(resourcelock.EndpointsResourceLock, namespace, lockName, @@ -51,10 +69,49 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: onStart, + OnStartedLeading: func(stop <-chan struct{}) { + recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName)) + onStart(stop) + }, OnStoppedLeading: func() { - log.Info().Msg("Leader election lost") + recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName)) + log.Info().Msg("Stop leading. Terminating process") + os.Exit(1) }, }, }) } + +// getLeaderElectionEventTarget returns the object that leader election related +// events will be added to. +func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object { + ns := o.Config.Namespace + kubecli := o.Dependencies.KubeCli + pods := kubecli.CoreV1().Pods(ns) + log = log.With().Str("pod-name", o.Config.PodName).Logger() + pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{}) + if err != nil { + log.Error().Err(err).Msg("Cannot find Pod containing this operator") + return nil + } + rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns) + if err != nil { + log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator") + return pod + } + if rSet == nil { + log.Error().Msg("Pod containing this operator has no ReplicaSet owner") + return pod + } + log = log.With().Str("replicaSet-name", rSet.Name).Logger() + depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns) + if err != nil { + log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator") + return rSet + } + if rSet == nil { + log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner") + return rSet + } + return depl +} diff --git a/pkg/util/k8sutil/owner.go b/pkg/util/k8sutil/owner.go new file mode 100644 index 000000000..2daf15027 --- /dev/null +++ b/pkg/util/k8sutil/owner.go @@ -0,0 +1,62 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package k8sutil + +import ( + "k8s.io/api/apps/v1beta2" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// GetPodOwner returns the ReplicaSet that owns the given Pod. +// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned. +func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) { + for _, ref := range pod.GetOwnerReferences() { + if ref.Kind == "ReplicaSet" { + rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace()) + rSet, err := rSets.Get(ref.Name, metav1.GetOptions{}) + if err != nil { + return nil, maskAny(err) + } + return rSet, nil + } + } + return nil, nil +} + +// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet. +// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned. +func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) { + for _, ref := range rSet.GetOwnerReferences() { + if ref.Kind == "Deployment" { + depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace()) + depl, err := depls.Get(ref.Name, metav1.GetOptions{}) + if err != nil { + return nil, maskAny(err) + } + return depl, nil + } + } + return nil, nil +} From 7051e00c896a7fdf84602264fb3e78083307e603 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Tue, 26 Jun 2018 10:52:37 +0200 Subject: [PATCH 2/3] Set operator Pod ready when leader change is detected --- pkg/operator/operator.go | 6 +++--- pkg/operator/operator_leader.go | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index ba43136b4..f4ba43b23 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) { // Run the operator func (o *Operator) Run() { if o.Config.EnableDeployment { - go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment) + go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe) } if o.Config.EnableDeploymentReplication { - go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication) + go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe) } if o.Config.EnableStorage { - go o.runLeaderElection("arango-storage-operator", o.onStartStorage) + go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe) } // Wait until process terminates <-context.TODO().Done() diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index dfa5c0d72..b34b6a4c1 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -35,13 +35,16 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/probe" ) // runLeaderElection performs a leader election on a lock with given name in // the namespace that the operator is deployed in. // When the leader election is won, the given callback is called. // When the leader election is lost (even after it was won once), the process is killed. -func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) { +// The given ready probe is set, as soon as this process became the leader, or a new leader +// is detected. +func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) { namespace := o.Config.Namespace kubecli := o.Dependencies.KubeCli log := o.log.With().Str("lock-name", lockName).Logger() @@ -71,6 +74,7 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(stop <-chan struct{}) { recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName)) + readyProbe.SetReady() onStart(stop) }, OnStoppedLeading: func() { @@ -78,6 +82,10 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s log.Info().Msg("Stop leading. Terminating process") os.Exit(1) }, + OnNewLeader: func(identity string) { + log.Info().Str("identity", identity).Msg("New leader detected") + readyProbe.SetReady() + }, }, }) } From b415c7c90d9b5c26c4e91b9ad0730d09da88997c Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Tue, 26 Jun 2018 13:31:49 +0200 Subject: [PATCH 3/3] Fixed comment --- pkg/operator/operator_leader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index b34b6a4c1..26a597f06 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -41,7 +41,7 @@ import ( // runLeaderElection performs a leader election on a lock with given name in // the namespace that the operator is deployed in. // When the leader election is won, the given callback is called. -// When the leader election is lost (even after it was won once), the process is killed. +// When the leader election is was won once, but then the leadership is lost, the process is killed. // The given ready probe is set, as soon as this process became the leader, or a new leader // is detected. func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {