diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 3c25387f5..ea18cc019 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -3,9 +3,11 @@ package main import ( "context" "fmt" + "io/ioutil" "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -74,19 +76,22 @@ func (m *Main) Run() error { }() // Kubernetes clients. - stdclient, customclient, aeClientset, err := utils.CreateKubernetesClients(m.flags) + k8sClient, customClient, aeClientset, err := utils.CreateKubernetesClients(m.flags) if err != nil { return err } // Create kubernetes service. - k8sservice := k8s.New(stdclient, customclient, aeClientset, m.logger) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger) // Create the redis clients redisClient := redis.New() + // Get lease lock resource namespace + lockNamespace := getNamespace() + // Create operator and run. - redisfailoverOperator, err := redisfailover.New(m.flags.ToRedisOperatorConfig(), k8sservice, redisClient, metricsRecorder, m.logger) + redisfailoverOperator, err := redisfailover.New(m.flags.ToRedisOperatorConfig(), k8sservice, k8sClient, lockNamespace, redisClient, metricsRecorder, m.logger) if err != nil { return err } @@ -124,6 +129,26 @@ func (m *Main) stop(stopC chan struct{}) { time.Sleep(gracePeriod) } +func getNamespace() string { + // This way assumes you've set the POD_NAMESPACE environment + // variable using the downward API. This check has to be done first + // for backwards compatibility with the way InClusterConfig was + // originally set up + if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok { + return ns + } + + // Fall back to the namespace associated with the service account + // token, if available + if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := strings.TrimSpace(string(data)); len(ns) > 0 { + return ns + } + } + + return "default" +} + // Run app. func main() { logger := log.Base() diff --git a/example/operator/operator.yaml b/example/operator/operator.yaml index bce8c0684..3fd4effba 100644 --- a/example/operator/operator.yaml +++ b/example/operator/operator.yaml @@ -5,7 +5,7 @@ metadata: app: redisoperator name: redisoperator spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: redisoperator @@ -18,18 +18,24 @@ spec: spec: serviceAccountName: redisoperator containers: - - image: quay.io/spotahome/redis-operator:v1.1.0 - imagePullPolicy: IfNotPresent - name: app - securityContext: - readOnlyRootFilesystem: true - runAsNonRoot: true - runAsUser: 1000 - resources: - limits: - cpu: 100m - memory: 50Mi - requests: - cpu: 10m - memory: 50Mi + - image: quay.io/spotahome/redis-operator:v1.1.0 + imagePullPolicy: IfNotPresent + name: app + securityContext: + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 1000 + resources: + limits: + cpu: 100m + memory: 50Mi + requests: + cpu: 10m + memory: 50Mi + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace restartPolicy: Always diff --git a/operator/redisfailover/factory.go b/operator/redisfailover/factory.go index bc1e3ee14..ac92a9f2b 100644 --- a/operator/redisfailover/factory.go +++ b/operator/redisfailover/factory.go @@ -5,10 +5,12 @@ import ( "time" "github.com/spotahome/kooper/v2/controller" + "github.com/spotahome/kooper/v2/controller/leaderelection" kooperlog "github.com/spotahome/kooper/v2/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" @@ -21,11 +23,12 @@ import ( const ( resync = 30 * time.Second operatorName = "redis-operator" + lockKey = "redis-failover-lease" ) // New will create an operator that is responsible of managing all the required stuff // to create redis failovers. -func New(cfg Config, k8sService k8s.Services, redisClient redis.Client, kooperMetricsRecorder metrics.Recorder, logger log.Logger) (controller.Controller, error) { +func New(cfg Config, k8sService k8s.Services, k8sClient kubernetes.Interface, lockNamespace string, redisClient redis.Client, kooperMetricsRecorder metrics.Recorder, logger log.Logger) (controller.Controller, error) { // Create internal services. rfService := rfservice.NewRedisFailoverKubeClient(k8sService, logger) rfChecker := rfservice.NewRedisFailoverChecker(k8sService, redisClient, logger) @@ -35,12 +38,20 @@ func New(cfg Config, k8sService k8s.Services, redisClient redis.Client, kooperMe rfHandler := NewRedisFailoverHandler(cfg, rfService, rfChecker, rfHealer, k8sService, kooperMetricsRecorder, logger) rfRetriever := NewRedisFailoverRetriever(k8sService) + kooperLogger := kooperlogger{Logger: logger.WithField("operator", "redisfailover")} + // Leader election service. + leSVC, err := leaderelection.NewDefault(lockKey, lockNamespace, k8sClient, kooperLogger) + if err != nil { + return nil, err + } + // Create our controller. return controller.New(&controller.Config{ Handler: rfHandler, Retriever: rfRetriever, + LeaderElector: leSVC, MetricsRecorder: kooperMetricsRecorder, - Logger: kooperlogger{Logger: logger.WithField("operator", "redisfailover")}, + Logger: kooperLogger, Name: "redisfailover", ResyncInterval: resync, }) diff --git a/test/integration/redisfailover/creation_test.go b/test/integration/redisfailover/creation_test.go index 524d3a4be..8ba731ea0 100644 --- a/test/integration/redisfailover/creation_test.go +++ b/test/integration/redisfailover/creation_test.go @@ -76,21 +76,21 @@ func TestRedisFailover(t *testing.T) { } // Kubernetes clients. - stdclient, customclient, aeClientset, err := utils.CreateKubernetesClients(flags) + k8sClient, customClient, aeClientset, err := utils.CreateKubernetesClients(flags) require.NoError(err) // Create the redis clients redisClient := redis.New() clients := clients{ - k8sClient: stdclient, - rfClient: customclient, + k8sClient: k8sClient, + rfClient: customClient, aeClient: aeClientset, redisClient: redisClient, } // Create kubernetes service. - k8sservice := k8s.New(stdclient, customclient, aeClientset, log.Dummy) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy) // Prepare namespace prepErr := clients.prepareNS() @@ -100,7 +100,7 @@ func TestRedisFailover(t *testing.T) { time.Sleep(15 * time.Second) // Create operator and run. - redisfailoverOperator, err := redisfailover.New(redisfailover.Config{}, k8sservice, redisClient, metrics.Dummy, log.Dummy) + redisfailoverOperator, err := redisfailover.New(redisfailover.Config{}, k8sservice, k8sClient, namespace, redisClient, metrics.Dummy, log.Dummy) require.NoError(err) go func() { @@ -123,7 +123,7 @@ func TestRedisFailover(t *testing.T) { "password": []byte(testPass), }, } - _, err = stdclient.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{}) + _, err = k8sClient.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{}) require.NoError(err) // Check that if we create a RedisFailover, it is certainly created and we can get it