Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All operator Pods will now reach the Ready state. #201

Merged
merged 3 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/rs/zerolog"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
appsv1beta2 "k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -271,5 +273,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
log.Info().Msgf(format, args...)
})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
combinedScheme := runtime.NewScheme()
scheme.AddToScheme(combinedScheme)
v1.AddToScheme(combinedScheme)
appsv1beta2.AddToScheme(combinedScheme)
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
}
4 changes: 2 additions & 2 deletions manifests/templates/deployment-replication/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]

---

Expand Down
4 changes: 2 additions & 2 deletions manifests/templates/deployment/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list"]
Expand Down
3 changes: 3 additions & 0 deletions manifests/templates/storage/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ rules:
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["*"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["*"]
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
// Run the operator
func (o *Operator) Run() {
if o.Config.EnableDeployment {
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment)
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe)
}
if o.Config.EnableDeploymentReplication {
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication)
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe)
}
if o.Config.EnableStorage {
go o.runLeaderElection("arango-storage-operator", o.onStartStorage)
go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe)
}
// Wait until process terminates
<-context.TODO().Done()
Expand Down
71 changes: 68 additions & 3 deletions pkg/operator/operator_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@
package operator

import (
"fmt"
"os"
"time"

"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/probe"
)

func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
// runLeaderElection performs a leader election on a lock with given name in
// the namespace that the operator is deployed in.
// When the leader election is won, the given callback is called.
// When the leader election is lost (even after it was won once), the process is killed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment confused me. If a new pod tries to get elected and "loses" the election, then the process remains lingering around and will try again. It is only if leadership is lost that the process exits. Please adjust the comment. Code is OK, as far as I see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// The given ready probe is set, as soon as this process became the leader, or a new leader
// is detected.
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {
namespace := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
log := o.log.With().Str("lock-name", lockName).Logger()
eventTarget := o.getLeaderElectionEventTarget(log)
recordEvent := func(reason, message string) {
if eventTarget != nil {
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
}
}
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
namespace,
lockName,
Expand All @@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStart,
OnStartedLeading: func(stop <-chan struct{}) {
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
readyProbe.SetReady()
onStart(stop)
},
OnStoppedLeading: func() {
log.Info().Msg("Leader election lost")
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
log.Info().Msg("Stop leading. Terminating process")
os.Exit(1)
},
OnNewLeader: func(identity string) {
log.Info().Str("identity", identity).Msg("New leader detected")
readyProbe.SetReady()
},
},
})
}

// getLeaderElectionEventTarget returns the object that leader election related
// events will be added to.
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
ns := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
pods := kubecli.CoreV1().Pods(ns)
log = log.With().Str("pod-name", o.Config.PodName).Logger()
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
if err != nil {
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
return nil
}
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
return pod
}
if rSet == nil {
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
return pod
}
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
return rSet
}
if rSet == nil {
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
return rSet
}
return depl
}
62 changes: 62 additions & 0 deletions pkg/util/k8sutil/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package k8sutil

import (
"k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// GetPodOwner returns the ReplicaSet that owns the given Pod.
// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned.
func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) {
for _, ref := range pod.GetOwnerReferences() {
if ref.Kind == "ReplicaSet" {
rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace())
rSet, err := rSets.Get(ref.Name, metav1.GetOptions{})
if err != nil {
return nil, maskAny(err)
}
return rSet, nil
}
}
return nil, nil
}

// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet.
// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned.
func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) {
for _, ref := range rSet.GetOwnerReferences() {
if ref.Kind == "Deployment" {
depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace())
depl, err := depls.Get(ref.Name, metav1.GetOptions{})
if err != nil {
return nil, maskAny(err)
}
return depl, nil
}
}
return nil, nil
}