Skip to content

Commit

Permalink
refactor controller and redis package
Browse files Browse the repository at this point in the history
redis no longer waits for kubernetes to report all containers as
ready and running. It acts on the best effort basis instantiating
replication once the minimum replication size (2) is met.

Signed-off-by: Nick Revin <[email protected]>
  • Loading branch information
nrvnrvn committed Jul 16, 2019
1 parent 878ce66 commit a56a004
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 415 deletions.
58 changes: 30 additions & 28 deletions pkg/controller/redis/object_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@ package redis

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"reflect"
"runtime"
"strings"

k8sv1alpha1 "github.com/amaizfinance/redis-operator/pkg/apis/k8s/v1alpha1"
"github.com/amaizfinance/redis-operator/pkg/redis"

"golang.org/x/crypto/argon2"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"

k8sv1alpha1 "github.com/amaizfinance/redis-operator/pkg/apis/k8s/v1alpha1"
"github.com/amaizfinance/redis-operator/pkg/redis"
)

const (
Expand Down Expand Up @@ -113,7 +112,7 @@ type objectGeneratorOptions struct {
}

// generateObject is a Kubernetes object factory, returns the name of the object and the object itself
func generateObject(r *k8sv1alpha1.Redis, object k8sruntime.Object, options objectGeneratorOptions) (string, k8sruntime.Object) {
func generateObject(r *k8sv1alpha1.Redis, object k8sruntime.Object, options objectGeneratorOptions) k8sruntime.Object {
switch object.(type) {
case *corev1.Secret:
return generateSecret(r, options.password)
Expand All @@ -126,7 +125,7 @@ func generateObject(r *k8sv1alpha1.Redis, object k8sruntime.Object, options obje
case *appsv1.StatefulSet:
return generateStatefulSet(r, options.password)
}
return "", nil
return nil
}

// objectUpdateNeeded compares two generic Kubernetes objects and updates the fields that differ.
Expand All @@ -148,46 +147,46 @@ func objectUpdateNeeded(got, want k8sruntime.Object) (needed bool) {
}

// resource generators
func generateSecret(r *k8sv1alpha1.Redis, password string) (string, *corev1.Secret) {
func generateSecret(r *k8sv1alpha1.Redis, password string) *corev1.Secret {
var b strings.Builder
defer b.Reset()
fmt.Fprintf(&b, authConfTemplate, password)
_, _ = fmt.Fprintf(&b, authConfTemplate, password)

return generateName(r), &corev1.Secret{
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
Data: map[string][]byte{secretFileName: []byte(b.String())},
}
}

func generateConfigMap(r *k8sv1alpha1.Redis, master redis.Address) (string, *corev1.ConfigMap) {
func generateConfigMap(r *k8sv1alpha1.Redis, master redis.Address) *corev1.ConfigMap {
var b strings.Builder
defer b.Reset()
// explicitly set the working directory
fmt.Fprintf(&b, "# Generated by redis-operator for redis.k8s.amaiz.com/%s\ndir %s\n", r.Name, workingDir)
_, _ = fmt.Fprintf(&b, "# Generated by redis-operator for redis.k8s.amaiz.com/%s\ndir %s\n", r.Name, workingDir)

if r.Spec.Password.SecretKeyRef != nil {
fmt.Fprintf(&b, "include %s\n", secretMountPath)
_, _ = fmt.Fprintf(&b, "include %s\n", secretMountPath)
}

for k, v := range r.Spec.Config {
if _, ok := excludedConfigDirectives[k]; !ok {
fmt.Fprintf(&b, "%s %s\n", k, v)
_, _ = fmt.Fprintf(&b, "%s %s\n", k, v)
}
}

if master != (redis.Address{}) {
fmt.Fprintf(&b, "replicaof %s %d\n", master.Host, redis.Port)
_, _ = fmt.Fprintf(&b, "replicaof %s %d\n", master.Host, redis.Port)
}

return generateName(r), &corev1.ConfigMap{
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
Data: map[string]string{configFileName: b.String()}}
}

func generateService(r *k8sv1alpha1.Redis, serviceType int) (string, *corev1.Service) {
func generateService(r *k8sv1alpha1.Redis, serviceType int) *corev1.Service {
var name, clusterIP string
var selector map[string]string
labels := map[string]string{}
labels := make(map[string]string)
for k, v := range r.Labels {
labels[k] = v
}
Expand All @@ -213,6 +212,7 @@ func generateService(r *k8sv1alpha1.Redis, serviceType int) (string, *corev1.Ser
Port: redisPort,
TargetPort: intstr.FromInt(redisPort),
}}

if !reflect.DeepEqual(r.Spec.Exporter, k8sv1alpha1.ContainerSpec{}) {
ports = append(ports, corev1.ServicePort{
Name: exporterName,
Expand All @@ -222,7 +222,7 @@ func generateService(r *k8sv1alpha1.Redis, serviceType int) (string, *corev1.Ser
})
}

return name, &corev1.Service{
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: r.Namespace, Labels: labels},
Spec: corev1.ServiceSpec{
Ports: ports,
Expand All @@ -233,8 +233,8 @@ func generateService(r *k8sv1alpha1.Redis, serviceType int) (string, *corev1.Ser
}
}

func generatePodDisruptionBudget(r *k8sv1alpha1.Redis) (string, *policyv1beta1.PodDisruptionBudget) {
return generateName(r), &policyv1beta1.PodDisruptionBudget{
func generatePodDisruptionBudget(r *k8sv1alpha1.Redis) *policyv1beta1.PodDisruptionBudget {
return &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
Spec: policyv1beta1.PodDisruptionBudgetSpec{
MinAvailable: &[]intstr.IntOrString{intstr.FromInt(redis.MinimumFailoverSize)}[0],
Expand All @@ -243,7 +243,7 @@ func generatePodDisruptionBudget(r *k8sv1alpha1.Redis) (string, *policyv1beta1.P
}
}

func generateStatefulSet(r *k8sv1alpha1.Redis, password string) (string, *appsv1.StatefulSet) {
func generateStatefulSet(r *k8sv1alpha1.Redis, password string) *appsv1.StatefulSet {
// VolumeMount names
configMapMountName := fmt.Sprintf("%s-config", generateName(r))
secretMountName := fmt.Sprintf("%s-secret", generateName(r))
Expand Down Expand Up @@ -286,7 +286,7 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) (string, *appsv1
if r.Spec.Password.SecretKeyRef != nil {
// rotating passwords requires Pod restarts.
// adding password hash as the pod annotation will automatically trigger rolling pod restarts.
r.Spec.Annotations[passwordHashKey] = fmt.Sprintf("%x", argon2.IDKey(
r.Spec.Annotations[passwordHashKey] = hex.EncodeToString(argon2.IDKey(
[]byte(password), []byte(r.UID), argonTime, argonMemory, argonThreads, hashLen,
))

Expand Down Expand Up @@ -314,7 +314,7 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) (string, *appsv1
})
}

volumeClaimTemplates := []corev1.PersistentVolumeClaim{}
var volumeClaimTemplates []corev1.PersistentVolumeClaim
if !reflect.DeepEqual(r.Spec.DataVolumeClaimTemplate, corev1.PersistentVolumeClaim{}) {
volumeClaimTemplates = append(volumeClaimTemplates, r.Spec.DataVolumeClaimTemplate)
containers[0].VolumeMounts = append(containers[0].VolumeMounts, corev1.VolumeMount{
Expand Down Expand Up @@ -362,7 +362,7 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) (string, *appsv1
}

s := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels, Annotations: map[string]string{}},
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels, Annotations: make(map[string]string)},
Spec: appsv1.StatefulSetSpec{
Replicas: r.Spec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: r.Labels},
Expand Down Expand Up @@ -394,7 +394,7 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) (string, *appsv1
}
s.Annotations[hashAnnotationKey] = hash

return generateName(r), s
return s
}

// state checkers
Expand Down Expand Up @@ -441,6 +441,7 @@ func serviceUpdateNeeded(got, want *corev1.Service) (needed bool) {
func podDisruptionBudgetUpdateNeeded(got, want *policyv1beta1.PodDisruptionBudget) (needed bool) {
// updating PDB spec is forbidden
// TODO: keep an eye on https://github.com/kubernetes/kubernetes/issues/45398
// bring back PDB spec comparison once the minimum supported k8s version is 1.15
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
return true
Expand All @@ -462,7 +463,8 @@ func statefulSetUpdateNeeded(got, want *appsv1.StatefulSet) (needed bool) {
got.Spec.Replicas = want.Spec.Replicas
needed = true
}
if !deepContains(got.Spec.Template, want.Spec.Template) {

if !deepContains(got.Spec.Template, want.Spec.Template) || (got.Annotations[hashAnnotationKey] != want.Annotations[hashAnnotationKey]) {
got.Spec.Template = want.Spec.Template
needed = true
}
Expand Down Expand Up @@ -500,5 +502,5 @@ func hashObject(object k8sruntime.Object) (string, error) {
return "", err
}

return fmt.Sprintf("%x", hash.Sum(nil)), nil
return hex.EncodeToString(hash.Sum(nil)), nil
}
Loading

0 comments on commit a56a004

Please sign in to comment.