diff --git a/deps/github.com/golang/glog/glog_redirect.go b/deps/github.com/golang/glog/glog_redirect.go new file mode 100644 index 000000000..b8a4d78d0 --- /dev/null +++ b/deps/github.com/golang/glog/glog_redirect.go @@ -0,0 +1,89 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +// +// Note: This code is added to the standard glog package. +// It has to be here because it needs package level +// access to some members. +// Do not remove this when updating the vendored glog package! +// + +package glog + +import "strings" + +type LogLevel int + +const ( + // Make sure these constants end up having the same indexes as the severity constants + LogLevelInfo LogLevel = iota + LogLevelWarning + LogLevelError + LogLevelFatal +) + +// redirectWriter wraps a callback that is called when data is written to it. +type redirectWriter struct { + cb func(level LogLevel, message string) + level LogLevel +} + +func (w *redirectWriter) Flush() error { + return nil +} + +func (w *redirectWriter) Sync() error { + return nil +} + +func (w *redirectWriter) Write(p []byte) (n int, err error) { + msg := string(p) + if msg[len(msg)-1] == '\n' { + msg = msg[:len(msg)-1] + } + if idx := strings.IndexByte(msg, ']'); idx > 0 { + msg = strings.TrimSpace(msg[idx+1:]) + } + w.cb(w.level, msg) + return len(p), nil +} + +// RedirectOutput redirects output of the given logging to the given callback. +func (l *loggingT) RedirectOutput(cb func(level LogLevel, message string)) { + l.mu.Lock() + defer l.mu.Unlock() + + l.toStderr = false + l.alsoToStderr = false + for i := range logging.file { + logging.file[i] = &redirectWriter{ + cb: cb, + level: LogLevel(i), + } + } + return +} + +// RedirectOutput redirects output of thestandard logging to the given callback. +func RedirectOutput(cb func(level LogLevel, message string)) { + logging.RedirectOutput(cb) +} diff --git a/main.go b/main.go index d93bb5d62..1f37d7685 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/pkg/errors" @@ -36,8 +37,10 @@ import ( "github.com/rs/zerolog" "github.com/spf13/cobra" flag "github.com/spf13/pflag" + appsv1beta2 "k8s.io/api/apps/v1beta2" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -116,12 +119,25 @@ func cmdUsage(cmd *cobra.Command, args []string) { // Run the operator func cmdMainRun(cmd *cobra.Command, args []string) { + // Get environment + namespace := os.Getenv(constants.EnvOperatorPodNamespace) + name := os.Getenv(constants.EnvOperatorPodName) + ip := os.Getenv(constants.EnvOperatorPodIP) + + // Prepare log service goflag.CommandLine.Parse([]string{"-logtostderr"}) var err error logService, err = logging.NewService(logLevel) if err != nil { cliLog.Fatal().Err(err).Msg("Failed to initialize log service") } + logService.ConfigureRootLogger(func(log zerolog.Logger) zerolog.Logger { + podNameParts := strings.Split(name, "-") + operatorID := podNameParts[len(podNameParts)-1] + cliLog = cliLog.With().Str("operator-id", operatorID).Logger() + return log.With().Str("operator-id", operatorID).Logger() + }) + logService.CaptureGLog(logService.MustGetLogger("glog")) // Check operating mode if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage { @@ -129,18 +145,18 @@ func cmdMainRun(cmd *cobra.Command, args []string) { } // Log version - cliLog.Info().Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild) + cliLog.Info(). + Str("pod-name", name). + Str("pod-namespace", namespace). + Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild) - // Get environment - namespace := os.Getenv(constants.EnvOperatorPodNamespace) + // Check environment if len(namespace) == 0 { cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace) } - name := os.Getenv(constants.EnvOperatorPodName) if len(name) == 0 { cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName) } - ip := os.Getenv(constants.EnvOperatorPodIP) if len(ip) == 0 { cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodIP) } @@ -271,5 +287,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name log.Info().Msgf(format, args...) }) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)}) - return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) + combinedScheme := runtime.NewScheme() + scheme.AddToScheme(combinedScheme) + v1.AddToScheme(combinedScheme) + appsv1beta2.AddToScheme(combinedScheme) + return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name}) } diff --git a/manifests/templates/deployment-replication/rbac.yaml b/manifests/templates/deployment-replication/rbac.yaml index 90df2baf4..d32dc7d50 100644 --- a/manifests/templates/deployment-replication/rbac.yaml +++ b/manifests/templates/deployment-replication/rbac.yaml @@ -33,8 +33,8 @@ rules: resources: ["nodes"] verbs: ["get"] - apiGroups: ["apps"] - resources: ["deployments"] - verbs: ["*"] + resources: ["deployments", "replicasets"] + verbs: ["get"] --- diff --git a/manifests/templates/deployment/rbac.yaml b/manifests/templates/deployment/rbac.yaml index b9362d931..5ccb4e647 100644 --- a/manifests/templates/deployment/rbac.yaml +++ b/manifests/templates/deployment/rbac.yaml @@ -30,8 +30,8 @@ rules: resources: ["nodes"] verbs: ["get"] - apiGroups: ["apps"] - resources: ["deployments"] - verbs: ["*"] + resources: ["deployments", "replicasets"] + verbs: ["get"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list"] diff --git a/manifests/templates/storage/rbac.yaml b/manifests/templates/storage/rbac.yaml index 6f743d913..fdbc28f99 100644 --- a/manifests/templates/storage/rbac.yaml +++ b/manifests/templates/storage/rbac.yaml @@ -33,6 +33,9 @@ rules: - apiGroups: ["apps"] resources: ["daemonsets"] verbs: ["*"] +- apiGroups: ["apps"] + resources: ["deployments", "replicasets"] + verbs: ["get"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["*"] diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index 991799f33..25857c03a 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -28,6 +28,7 @@ import ( "strings" "sync" + "github.com/golang/glog" "github.com/rs/zerolog" ) @@ -47,6 +48,10 @@ type Service interface { MustGetLogger(name string) zerolog.Logger // MustSetLevel sets the log level for the component with given name to given level. MustSetLevel(name, level string) + // ConfigureRootLogger calls the given callback to modify the root logger. + ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger) + // CaptureGLog configures glog to write to the given logger + CaptureGLog(log zerolog.Logger) } // loggingService implements Service @@ -83,6 +88,32 @@ func NewService(defaultLevel string) (Service, error) { return s, nil } +// ConfigureRootLogger calls the given callback to modify the root logger. +func (s *loggingService) ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.rootLog = cb(s.rootLog) +} + +// CaptureGLog configures glog to write to the given logger +func (s *loggingService) CaptureGLog(log zerolog.Logger) { + glog.RedirectOutput(func(level glog.LogLevel, msg string) { + var e *zerolog.Event + switch level { + case glog.LogLevelWarning: + e = log.WithLevel(zerolog.WarnLevel) + case glog.LogLevelError: + e = log.WithLevel(zerolog.ErrorLevel) + case glog.LogLevelFatal: + e = log.WithLevel(zerolog.FatalLevel) + default: + e = log.WithLevel(zerolog.InfoLevel) + } + e.Msg(msg) + }) +} + // MustGetLogger creates a logger with given name func (s *loggingService) MustGetLogger(name string) zerolog.Logger { s.mutex.Lock() diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index ba43136b4..f4ba43b23 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) { // Run the operator func (o *Operator) Run() { if o.Config.EnableDeployment { - go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment) + go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe) } if o.Config.EnableDeploymentReplication { - go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication) + go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe) } if o.Config.EnableStorage { - go o.runLeaderElection("arango-storage-operator", o.onStartStorage) + go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe) } // Wait until process terminates <-context.TODO().Done() diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index 5d94c2058..26a597f06 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -23,16 +23,37 @@ package operator import ( + "fmt" + "os" "time" + "github.com/rs/zerolog" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/probe" ) -func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) { +// runLeaderElection performs a leader election on a lock with given name in +// the namespace that the operator is deployed in. +// When the leader election is won, the given callback is called. +// When the leader election is was won once, but then the leadership is lost, the process is killed. +// The given ready probe is set, as soon as this process became the leader, or a new leader +// is detected. +func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) { namespace := o.Config.Namespace kubecli := o.Dependencies.KubeCli log := o.log.With().Str("lock-name", lockName).Logger() + eventTarget := o.getLeaderElectionEventTarget(log) + recordEvent := func(reason, message string) { + if eventTarget != nil { + o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message) + } + } rl, err := resourcelock.New(resourcelock.EndpointsResourceLock, namespace, lockName, @@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: onStart, + OnStartedLeading: func(stop <-chan struct{}) { + recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName)) + readyProbe.SetReady() + onStart(stop) + }, OnStoppedLeading: func() { - log.Info().Msg("Leader election lost") + recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName)) + log.Info().Msg("Stop leading. Terminating process") + os.Exit(1) + }, + OnNewLeader: func(identity string) { + log.Info().Str("identity", identity).Msg("New leader detected") + readyProbe.SetReady() }, }, }) } + +// getLeaderElectionEventTarget returns the object that leader election related +// events will be added to. +func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object { + ns := o.Config.Namespace + kubecli := o.Dependencies.KubeCli + pods := kubecli.CoreV1().Pods(ns) + log = log.With().Str("pod-name", o.Config.PodName).Logger() + pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{}) + if err != nil { + log.Error().Err(err).Msg("Cannot find Pod containing this operator") + return nil + } + rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns) + if err != nil { + log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator") + return pod + } + if rSet == nil { + log.Error().Msg("Pod containing this operator has no ReplicaSet owner") + return pod + } + log = log.With().Str("replicaSet-name", rSet.Name).Logger() + depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns) + if err != nil { + log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator") + return rSet + } + if rSet == nil { + log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner") + return rSet + } + return depl +} diff --git a/pkg/util/k8sutil/owner.go b/pkg/util/k8sutil/owner.go new file mode 100644 index 000000000..2daf15027 --- /dev/null +++ b/pkg/util/k8sutil/owner.go @@ -0,0 +1,62 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package k8sutil + +import ( + "k8s.io/api/apps/v1beta2" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// GetPodOwner returns the ReplicaSet that owns the given Pod. +// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned. +func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) { + for _, ref := range pod.GetOwnerReferences() { + if ref.Kind == "ReplicaSet" { + rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace()) + rSet, err := rSets.Get(ref.Name, metav1.GetOptions{}) + if err != nil { + return nil, maskAny(err) + } + return rSet, nil + } + } + return nil, nil +} + +// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet. +// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned. +func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) { + for _, ref := range rSet.GetOwnerReferences() { + if ref.Kind == "Deployment" { + depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace()) + depl, err := depls.Get(ref.Name, metav1.GetOptions{}) + if err != nil { + return nil, maskAny(err) + } + return depl, nil + } + } + return nil, nil +}