Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election #438

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions cmd/redisoperator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package main
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -74,19 +76,22 @@ func (m *Main) Run() error {
}()

// Kubernetes clients.
stdclient, customclient, aeClientset, err := utils.CreateKubernetesClients(m.flags)
k8sClient, customClient, aeClientset, err := utils.CreateKubernetesClients(m.flags)
if err != nil {
return err
}

// Create kubernetes service.
k8sservice := k8s.New(stdclient, customclient, aeClientset, m.logger)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger)

// Create the redis clients
redisClient := redis.New()

// Get lease lock resource namespace
lockNamespace := getNamespace()

// Create operator and run.
redisfailoverOperator, err := redisfailover.New(m.flags.ToRedisOperatorConfig(), k8sservice, redisClient, metricsRecorder, m.logger)
redisfailoverOperator, err := redisfailover.New(m.flags.ToRedisOperatorConfig(), k8sservice, k8sClient, lockNamespace, redisClient, metricsRecorder, m.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,6 +129,26 @@ func (m *Main) stop(stopC chan struct{}) {
time.Sleep(gracePeriod)
}

func getNamespace() string {
// This way assumes you've set the POD_NAMESPACE environment
// variable using the downward API. This check has to be done first
// for backwards compatibility with the way InClusterConfig was
// originally set up
if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok {
return ns
}

// Fall back to the namespace associated with the service account
// token, if available
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
return ns
}
}

return "default"
}

// Run app.
func main() {
logger := log.Base()
Expand Down
36 changes: 21 additions & 15 deletions example/operator/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
app: redisoperator
name: redisoperator
spec:
replicas: 1
replicas: 2
selector:
matchLabels:
app: redisoperator
Expand All @@ -18,18 +18,24 @@ spec:
spec:
serviceAccountName: redisoperator
containers:
- image: quay.io/spotahome/redis-operator:v1.1.0
imagePullPolicy: IfNotPresent
name: app
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
resources:
limits:
cpu: 100m
memory: 50Mi
requests:
cpu: 10m
memory: 50Mi
- image: quay.io/spotahome/redis-operator:v1.1.0
imagePullPolicy: IfNotPresent
name: app
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
resources:
limits:
cpu: 100m
memory: 50Mi
requests:
cpu: 10m
memory: 50Mi
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
restartPolicy: Always
15 changes: 13 additions & 2 deletions operator/redisfailover/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/spotahome/kooper/v2/controller"
"github.com/spotahome/kooper/v2/controller/leaderelection"
kooperlog "github.com/spotahome/kooper/v2/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/redis-operator/log"
Expand All @@ -21,11 +23,12 @@ import (
const (
resync = 30 * time.Second
operatorName = "redis-operator"
lockKey = "redis-failover-lease"
)

// New will create an operator that is responsible of managing all the required stuff
// to create redis failovers.
func New(cfg Config, k8sService k8s.Services, redisClient redis.Client, kooperMetricsRecorder metrics.Recorder, logger log.Logger) (controller.Controller, error) {
func New(cfg Config, k8sService k8s.Services, k8sClient kubernetes.Interface, lockNamespace string, redisClient redis.Client, kooperMetricsRecorder metrics.Recorder, logger log.Logger) (controller.Controller, error) {
// Create internal services.
rfService := rfservice.NewRedisFailoverKubeClient(k8sService, logger)
rfChecker := rfservice.NewRedisFailoverChecker(k8sService, redisClient, logger)
Expand All @@ -35,12 +38,20 @@ func New(cfg Config, k8sService k8s.Services, redisClient redis.Client, kooperMe
rfHandler := NewRedisFailoverHandler(cfg, rfService, rfChecker, rfHealer, k8sService, kooperMetricsRecorder, logger)
rfRetriever := NewRedisFailoverRetriever(k8sService)

kooperLogger := kooperlogger{Logger: logger.WithField("operator", "redisfailover")}
// Leader election service.
leSVC, err := leaderelection.NewDefault(lockKey, lockNamespace, k8sClient, kooperLogger)
if err != nil {
return nil, err
}

// Create our controller.
return controller.New(&controller.Config{
Handler: rfHandler,
Retriever: rfRetriever,
LeaderElector: leSVC,
MetricsRecorder: kooperMetricsRecorder,
Logger: kooperlogger{Logger: logger.WithField("operator", "redisfailover")},
Logger: kooperLogger,
Name: "redisfailover",
ResyncInterval: resync,
})
Expand Down
12 changes: 6 additions & 6 deletions test/integration/redisfailover/creation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,21 @@ func TestRedisFailover(t *testing.T) {
}

// Kubernetes clients.
stdclient, customclient, aeClientset, err := utils.CreateKubernetesClients(flags)
k8sClient, customClient, aeClientset, err := utils.CreateKubernetesClients(flags)
require.NoError(err)

// Create the redis clients
redisClient := redis.New()

clients := clients{
k8sClient: stdclient,
rfClient: customclient,
k8sClient: k8sClient,
rfClient: customClient,
aeClient: aeClientset,
redisClient: redisClient,
}

// Create kubernetes service.
k8sservice := k8s.New(stdclient, customclient, aeClientset, log.Dummy)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy)

// Prepare namespace
prepErr := clients.prepareNS()
Expand All @@ -100,7 +100,7 @@ func TestRedisFailover(t *testing.T) {
time.Sleep(15 * time.Second)

// Create operator and run.
redisfailoverOperator, err := redisfailover.New(redisfailover.Config{}, k8sservice, redisClient, metrics.Dummy, log.Dummy)
redisfailoverOperator, err := redisfailover.New(redisfailover.Config{}, k8sservice, k8sClient, namespace, redisClient, metrics.Dummy, log.Dummy)
require.NoError(err)

go func() {
Expand All @@ -123,7 +123,7 @@ func TestRedisFailover(t *testing.T) {
"password": []byte(testPass),
},
}
_, err = stdclient.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{})
_, err = k8sClient.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{})
require.NoError(err)

// Check that if we create a RedisFailover, it is certainly created and we can get it
Expand Down