Skip to content

Commit

Permalink
feat: redis-cluster support podAntiAffinity
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhuang committed Dec 17, 2024
2 parents d28caf1 + d424796 commit 15c248b
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 19 deletions.
7 changes: 7 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,11 @@ resources:
kind: RedisSentinel
path: redis-operator/api/v1beta1
version: v1beta1
- group: core
kind: Pod
path: k8s.io/api/core/v1
version: v1
webhooks:
defaulting: true
webhookVersion: v1
version: "3"
31 changes: 16 additions & 15 deletions api/v1beta2/rediscluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ type RedisClusterSpec struct {
// +kubebuilder:default:=6379
Port *int `json:"port,omitempty"`
// +kubebuilder:default:=v7
ClusterVersion *string `json:"clusterVersion,omitempty"`
RedisLeader RedisLeader `json:"redisLeader,omitempty"`
RedisFollower RedisFollower `json:"redisFollower,omitempty"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
Storage *ClusterStorage `json:"storage,omitempty"`
PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
ACL *ACLConfig `json:"acl,omitempty"`
InitContainer *InitContainer `json:"initContainer,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
PersistenceEnabled *bool `json:"persistenceEnabled,omitempty"`
EnvVars *[]corev1.EnvVar `json:"env,omitempty"`
ClusterVersion *string `json:"clusterVersion,omitempty"`
RedisLeader RedisLeader `json:"redisLeader,omitempty"`
RedisFollower RedisFollower `json:"redisFollower,omitempty"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
Storage *ClusterStorage `json:"storage,omitempty"`
PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
ACL *ACLConfig `json:"acl,omitempty"`
InitContainer *InitContainer `json:"initContainer,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
PersistenceEnabled *bool `json:"persistenceEnabled,omitempty"`
EnvVars *[]corev1.EnvVar `json:"env,omitempty"`
LeaderFollowerPodAntiAffinity *bool `json:"leaderFollowerPodAntiAffinity,omitempty"`
}

func (cr *RedisClusterSpec) GetReplicaCounts(t string) int32 {
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6598,6 +6598,8 @@ spec:
required:
- image
type: object
leaderFollowerPodAntiAffinity:
type: boolean
persistenceEnabled:
type: boolean
podSecurityContext:
Expand Down
8 changes: 4 additions & 4 deletions config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ bases:
- ../manager
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
#- ../webhook
- ../webhook
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required.
#- ../certmanager
- ../certmanager
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
#- ../prometheus

Expand All @@ -36,12 +36,12 @@ bases:

# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
#- manager_webhook_patch.yaml
- manager_webhook_patch.yaml

# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'.
# Uncomment 'CERTMANAGER' sections in crd/kustomization.yaml to enable the CA injection in the admission webhooks.
# 'CERTMANAGER' needs to be enabled to use ca injection
#- webhookcainjection_patch.yaml
- webhookcainjection_patch.yaml

# the following config is for teaching kustomize how to do var substitution
#vars:
Expand Down
1 change: 1 addition & 0 deletions example/v1beta2/redis-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ spec:
# priorityClassName:
# Affinity:
# Tolerations: []
leaderFollowerPodAntiAffinity: false
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"flag"
"os"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"strings"

redisv1beta1 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta1"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllers/redissentinel"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/OT-CONTAINER-KIT/redis-operator/pkg/k8sutils"
coreWebhook "github.com/OT-CONTAINER-KIT/redis-operator/pkg/webhook"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -176,6 +178,10 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "RedisSentinel")
os.Exit(1)
}

wblog := ctrl.Log.WithName("webhook").WithName("PodAffiniytMutate")
mgr.GetWebhookServer().Register("/mutate-core-v1-pod", &webhook.Admission{
Handler: coreWebhook.NewPodAffiniytMutate(mgr.GetClient(), admission.NewDecoder(scheme), wblog)})
}
// +kubebuilder:scaffold:builder

Expand Down
185 changes: 185 additions & 0 deletions pkg/webhook/pod_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
Copyright 2020 Opstree Solutions.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"context"
"encoding/json"
"github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"strings"
)

//+kubebuilder:webhook:path=/mutate-core-v1-pod,mutating=true,failurePolicy=fail,sideEffects=None,groups=core,resources=pods,verbs=create,versions=v1,name=mpod.kb.io,admissionReviewVersions=v1

// PodAntiAffiniytMutate mutate Pods
type PodAntiAffiniytMutate struct {
Client client.Client
decoder *admission.Decoder
logger logr.Logger
}

func NewPodAffiniytMutate(c client.Client, d *admission.Decoder, log logr.Logger) admission.Handler {
return &PodAntiAffiniytMutate{
Client: c,
decoder: d,
logger: log}
}

const (
podAnnotationsRedisClusterApp = "redis.opstreelabs.instance"
podLabelsPodName = "statefulset.kubernetes.io/pod-name"
podLabelsRedisType = "redis_setup_type"
)

func (v *PodAntiAffiniytMutate) Handle(ctx context.Context, req admission.Request) admission.Response {
logger := v.logger.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)

pod := &corev1.Pod{}
err := v.decoder.Decode(req, pod)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}

// only mutate pods that belong to redis cluster
if !v.isRedisClusterPod(pod) {
return admission.Allowed("")
}

// determine whether to add anti affinity
redisCluster, err := v.getRedisClusterSpec(ctx, pod)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
if !(redisCluster != nil && redisCluster.Spec.LeaderFollowerPodAntiAffinity != nil &&
*redisCluster.Spec.LeaderFollowerPodAntiAffinity == true) {

Check failure on line 75 in pkg/webhook/pod_webhook.go

View workflow job for this annotation

GitHub Actions / lint

S1002: should omit comparison to bool constant, can be simplified to `*redisCluster.Spec.LeaderFollowerPodAntiAffinity` (gosimple)
v.logger.V(1).Info("leader follower pod anti affinity is disabled")
return admission.Allowed("")
}

old := pod.DeepCopy()

v.AddPodAntiAffinity(pod)
if !reflect.DeepEqual(old, pod) {
marshaledPod, err := json.Marshal(pod)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}

logger.Info("mutate pod with anti-affinity")
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}

return admission.Allowed("")
}

// PodAntiAffiniytMutate implements admission.DecoderInjector.
// A decoder will be automatically injected.

// InjectDecoder injects the decoder.
func (v *PodAntiAffiniytMutate) InjectDecoder(d *admission.Decoder) error {
v.decoder = d
return nil
}

func (m *PodAntiAffiniytMutate) InjectLogger(l logr.Logger) error {
m.logger = l
return nil
}

func (v *PodAntiAffiniytMutate) AddPodAntiAffinity(pod *corev1.Pod) {
podName := pod.ObjectMeta.Name
antiLabelValue := v.getAntiAffinityValue(podName)

if pod.Spec.Affinity == nil {
pod.Spec.Affinity = &corev1.Affinity{}
}
if pod.Spec.Affinity.PodAntiAffinity == nil {
pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
if pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = make([]corev1.PodAffinityTerm, 0)
}
addAntiAffinity := corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: podLabelsPodName,
Operator: metav1.LabelSelectorOpIn,
Values: []string{antiLabelValue},
},
},
},
TopologyKey: "kubernetes.io/hostname",
}

pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, addAntiAffinity)
}

func (v *PodAntiAffiniytMutate) getPodAnnotations(pod *corev1.Pod) map[string]string {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
return pod.Annotations
}

func (v *PodAntiAffiniytMutate) isRedisClusterPod(pod *corev1.Pod) bool {
annotations := v.getPodAnnotations(pod)
if _, ok := annotations[podAnnotationsRedisClusterApp]; !ok {
return false
}

labels := pod.GetLabels()
if _, ok := labels[podLabelsRedisType]; !ok {
return false
}

return true
}

func (v *PodAntiAffiniytMutate) getAntiAffinityValue(podName string) string {
if strings.Contains(podName, "follower") {
return strings.Replace(podName, "follower", "leader", -1)
}
if strings.Contains(podName, "leader") {
return strings.Replace(podName, "leader", "follower", -1)
}
return ""
}

func (v *PodAntiAffiniytMutate) getRedisClusterSpec(ctx context.Context, pod *corev1.Pod) (*v1beta2.RedisCluster, error) {
namespaceKey := client.ObjectKey{
Namespace: pod.Namespace,
Name: pod.Annotations[podAnnotationsRedisClusterApp],
}

redisCluster := &v1beta2.RedisCluster{}

err := v.Client.Get(ctx, namespaceKey, redisCluster)
if err != nil {
v.logger.Error(err, "failed to get redis cluster")
return nil, err
}

return redisCluster, nil
}

0 comments on commit 15c248b

Please sign in to comment.