Skip to content

Commit

Permalink
Merge branch 'master' into bugfix/default-storage-class
Browse files Browse the repository at this point in the history
  • Loading branch information
ewoutp committed Jun 26, 2018
2 parents 538ab10 + 5618606 commit 70a04c4
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 16 deletions.
89 changes: 89 additions & 0 deletions deps/github.com/golang/glog/glog_redirect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

//
// Note: This code is added to the standard glog package.
// It has to be here because it needs package level
// access to some members.
// Do not remove this when updating the vendored glog package!
//

package glog

import "strings"

type LogLevel int

const (
// Make sure these constants end up having the same indexes as the severity constants
LogLevelInfo LogLevel = iota
LogLevelWarning
LogLevelError
LogLevelFatal
)

// redirectWriter wraps a callback that is called when data is written to it.
type redirectWriter struct {
cb func(level LogLevel, message string)
level LogLevel
}

func (w *redirectWriter) Flush() error {
return nil
}

func (w *redirectWriter) Sync() error {
return nil
}

func (w *redirectWriter) Write(p []byte) (n int, err error) {
msg := string(p)
if msg[len(msg)-1] == '\n' {
msg = msg[:len(msg)-1]
}
if idx := strings.IndexByte(msg, ']'); idx > 0 {
msg = strings.TrimSpace(msg[idx+1:])
}
w.cb(w.level, msg)
return len(p), nil
}

// RedirectOutput redirects output of the given logging to the given callback.
func (l *loggingT) RedirectOutput(cb func(level LogLevel, message string)) {
l.mu.Lock()
defer l.mu.Unlock()

l.toStderr = false
l.alsoToStderr = false
for i := range logging.file {
logging.file[i] = &redirectWriter{
cb: cb,
level: LogLevel(i),
}
}
return
}

// RedirectOutput redirects output of thestandard logging to the given callback.
func RedirectOutput(cb func(level LogLevel, message string)) {
logging.RedirectOutput(cb)
}
32 changes: 26 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
appsv1beta2 "k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -116,31 +119,44 @@ func cmdUsage(cmd *cobra.Command, args []string) {

// Run the operator
func cmdMainRun(cmd *cobra.Command, args []string) {
// Get environment
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
name := os.Getenv(constants.EnvOperatorPodName)
ip := os.Getenv(constants.EnvOperatorPodIP)

// Prepare log service
goflag.CommandLine.Parse([]string{"-logtostderr"})
var err error
logService, err = logging.NewService(logLevel)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to initialize log service")
}
logService.ConfigureRootLogger(func(log zerolog.Logger) zerolog.Logger {
podNameParts := strings.Split(name, "-")
operatorID := podNameParts[len(podNameParts)-1]
cliLog = cliLog.With().Str("operator-id", operatorID).Logger()
return log.With().Str("operator-id", operatorID).Logger()
})
logService.CaptureGLog(logService.MustGetLogger("glog"))

// Check operating mode
if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage {
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage or any combination of these")
}

// Log version
cliLog.Info().Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild)
cliLog.Info().
Str("pod-name", name).
Str("pod-namespace", namespace).
Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild)

// Get environment
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
// Check environment
if len(namespace) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace)
}
name := os.Getenv(constants.EnvOperatorPodName)
if len(name) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName)
}
ip := os.Getenv(constants.EnvOperatorPodIP)
if len(ip) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodIP)
}
Expand Down Expand Up @@ -271,5 +287,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
log.Info().Msgf(format, args...)
})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
combinedScheme := runtime.NewScheme()
scheme.AddToScheme(combinedScheme)
v1.AddToScheme(combinedScheme)
appsv1beta2.AddToScheme(combinedScheme)
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
}
4 changes: 2 additions & 2 deletions manifests/templates/deployment-replication/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]

---

Expand Down
4 changes: 2 additions & 2 deletions manifests/templates/deployment/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list"]
Expand Down
3 changes: 3 additions & 0 deletions manifests/templates/storage/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ rules:
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["*"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["*"]
Expand Down
31 changes: 31 additions & 0 deletions pkg/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"

"github.com/golang/glog"
"github.com/rs/zerolog"
)

Expand All @@ -47,6 +48,10 @@ type Service interface {
MustGetLogger(name string) zerolog.Logger
// MustSetLevel sets the log level for the component with given name to given level.
MustSetLevel(name, level string)
// ConfigureRootLogger calls the given callback to modify the root logger.
ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger)
// CaptureGLog configures glog to write to the given logger
CaptureGLog(log zerolog.Logger)
}

// loggingService implements Service
Expand Down Expand Up @@ -83,6 +88,32 @@ func NewService(defaultLevel string) (Service, error) {
return s, nil
}

// ConfigureRootLogger calls the given callback to modify the root logger.
func (s *loggingService) ConfigureRootLogger(cb func(rootLog zerolog.Logger) zerolog.Logger) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.rootLog = cb(s.rootLog)
}

// CaptureGLog configures glog to write to the given logger
func (s *loggingService) CaptureGLog(log zerolog.Logger) {
glog.RedirectOutput(func(level glog.LogLevel, msg string) {
var e *zerolog.Event
switch level {
case glog.LogLevelWarning:
e = log.WithLevel(zerolog.WarnLevel)
case glog.LogLevelError:
e = log.WithLevel(zerolog.ErrorLevel)
case glog.LogLevelFatal:
e = log.WithLevel(zerolog.FatalLevel)
default:
e = log.WithLevel(zerolog.InfoLevel)
}
e.Msg(msg)
})
}

// MustGetLogger creates a logger with given name
func (s *loggingService) MustGetLogger(name string) zerolog.Logger {
s.mutex.Lock()
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
// Run the operator
func (o *Operator) Run() {
if o.Config.EnableDeployment {
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment)
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe)
}
if o.Config.EnableDeploymentReplication {
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication)
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe)
}
if o.Config.EnableStorage {
go o.runLeaderElection("arango-storage-operator", o.onStartStorage)
go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe)
}
// Wait until process terminates
<-context.TODO().Done()
Expand Down
71 changes: 68 additions & 3 deletions pkg/operator/operator_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@
package operator

import (
"fmt"
"os"
"time"

"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/probe"
)

func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
// runLeaderElection performs a leader election on a lock with given name in
// the namespace that the operator is deployed in.
// When the leader election is won, the given callback is called.
// When the leader election is was won once, but then the leadership is lost, the process is killed.
// The given ready probe is set, as soon as this process became the leader, or a new leader
// is detected.
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {
namespace := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
log := o.log.With().Str("lock-name", lockName).Logger()
eventTarget := o.getLeaderElectionEventTarget(log)
recordEvent := func(reason, message string) {
if eventTarget != nil {
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
}
}
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
namespace,
lockName,
Expand All @@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStart,
OnStartedLeading: func(stop <-chan struct{}) {
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
readyProbe.SetReady()
onStart(stop)
},
OnStoppedLeading: func() {
log.Info().Msg("Leader election lost")
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
log.Info().Msg("Stop leading. Terminating process")
os.Exit(1)
},
OnNewLeader: func(identity string) {
log.Info().Str("identity", identity).Msg("New leader detected")
readyProbe.SetReady()
},
},
})
}

// getLeaderElectionEventTarget returns the object that leader election related
// events will be added to.
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
ns := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
pods := kubecli.CoreV1().Pods(ns)
log = log.With().Str("pod-name", o.Config.PodName).Logger()
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
if err != nil {
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
return nil
}
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
return pod
}
if rSet == nil {
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
return pod
}
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
return rSet
}
if rSet == nil {
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
return rSet
}
return depl
}
Loading

0 comments on commit 70a04c4

Please sign in to comment.