Skip to content

Commit

Permalink
feat: generate k8s client only once (#750)
Browse files Browse the repository at this point in the history
Signed-off-by: Mathieu Cesbron <[email protected]>
  • Loading branch information
MathieuCesbron authored Jan 14, 2024
1 parent e57531a commit 7665d4a
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 166 deletions.
4 changes: 2 additions & 2 deletions controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return ctrl.Result{}, err
}

err = k8sutils.CreateStandaloneRedis(instance)
err = k8sutils.CreateStandaloneRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.CreateStandaloneService(instance)
err = k8sutils.CreateStandaloneService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
16 changes: 8 additions & 8 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,22 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

if leaderReplicas != 0 {
err = k8sutils.CreateRedisLeaderService(instance)
err = k8sutils.CreateRedisLeaderService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
err = k8sutils.CreateRedisLeader(instance)
err = k8sutils.CreateRedisLeader(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget)
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

redisLeaderInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-leader")
redisLeaderInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-leader", r.K8sClient)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
Expand All @@ -151,21 +151,21 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
// if we have followers create their service.
if followerReplicas != 0 {
err = k8sutils.CreateRedisFollowerService(instance)
err = k8sutils.CreateRedisFollowerService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
err = k8sutils.CreateRedisFollower(instance)
err = k8sutils.CreateRedisFollower(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "follower", instance.Spec.RedisFollower.PodDisruptionBudget)
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "follower", instance.Spec.RedisFollower.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
redisFollowerInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-follower")
redisFollowerInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-follower", r.K8sClient)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
Expand Down
6 changes: 3 additions & 3 deletions controllers/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}

err = k8sutils.CreateReplicationRedis(instance)
err = k8sutils.CreateReplicationRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.CreateReplicationService(instance)
err = k8sutils.CreateReplicationService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

// Set Pod distruptiuon Budget Later

redisReplicationInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name)
redisReplicationInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name, r.K8sClient)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 60}, err
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/redissentinel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ func (r *RedisSentinelReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Create Redis Sentinel
err = k8sutils.CreateRedisSentinel(ctx, r.K8sClient, r.Log, instance)
err = k8sutils.CreateRedisSentinel(ctx, r.K8sClient, r.Log, instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

err = k8sutils.ReconcileSentinelPodDisruptionBudget(instance, instance.Spec.PodDisruptionBudget)
err = k8sutils.ReconcileSentinelPodDisruptionBudget(instance, instance.Spec.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

// Create the Service for Redis Sentinel
err = k8sutils.CreateRedisSentinelService(instance)
err = k8sutils.CreateRedisSentinelService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
65 changes: 23 additions & 42 deletions k8sutils/poddisruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)

// CreateRedisLeaderPodDisruptionBudget check and create a PodDisruptionBudget for Leaders
func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role string, pdbParams *commonapi.RedisPodDisruptionBudget) error {
func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role string, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error {
pdbName := cr.ObjectMeta.Name + "-" + role
logger := pdbLogger(cr.Namespace, pdbName)
if pdbParams != nil && pdbParams.Enabled {
labels := getRedisLabels(cr.ObjectMeta.Name, cluster, role, cr.ObjectMeta.GetLabels())
annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations)
pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations)
pdbDef := generatePodDisruptionBudgetDef(cr, role, pdbMeta, cr.Spec.RedisLeader.PodDisruptionBudget)
return CreateOrUpdatePodDisruptionBudget(pdbDef)
return CreateOrUpdatePodDisruptionBudget(pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName)
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(cr.Namespace, pdbName)
return deletePodDisruptionBudget(cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
logger.V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.")
// Its ok if its not found, as we're deleting anyway
Expand All @@ -38,20 +39,20 @@ func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role strin
}
}

func ReconcileSentinelPodDisruptionBudget(cr *redisv1beta2.RedisSentinel, pdbParams *commonapi.RedisPodDisruptionBudget) error {
func ReconcileSentinelPodDisruptionBudget(cr *redisv1beta2.RedisSentinel, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error {
pdbName := cr.ObjectMeta.Name + "-sentinel"
logger := pdbLogger(cr.Namespace, pdbName)
if pdbParams != nil && pdbParams.Enabled {
labels := getRedisLabels(cr.ObjectMeta.Name, sentinel, "sentinel", cr.ObjectMeta.GetLabels())
annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations)
pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations)
pdbDef := generateSentinelPodDisruptionBudgetDef(cr, "sentinel", pdbMeta, pdbParams)
return CreateOrUpdatePodDisruptionBudget(pdbDef)
return CreateOrUpdatePodDisruptionBudget(pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName)
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(cr.Namespace, pdbName)
return deletePodDisruptionBudget(cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
logger.V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.")
// Its ok if its not found, as we're deleting anyway
Expand Down Expand Up @@ -116,24 +117,24 @@ func generateSentinelPodDisruptionBudgetDef(cr *redisv1beta2.RedisSentinel, role
}

// CreateOrUpdateService method will create or update Redis service
func CreateOrUpdatePodDisruptionBudget(pdbDef *policyv1.PodDisruptionBudget) error {
func CreateOrUpdatePodDisruptionBudget(pdbDef *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(pdbDef.Namespace, pdbDef.Name)
storedPDB, err := GetPodDisruptionBudget(pdbDef.Namespace, pdbDef.Name)
storedPDB, err := GetPodDisruptionBudget(pdbDef.Namespace, pdbDef.Name, cl)
if err != nil {
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(pdbDef); err != nil { //nolint
logger.Error(err, "Unable to patch redis PodDisruptionBudget with comparison object")
return err
}
if errors.IsNotFound(err) {
return createPodDisruptionBudget(pdbDef.Namespace, pdbDef)
return createPodDisruptionBudget(pdbDef.Namespace, pdbDef, cl)
}
return err
}
return patchPodDisruptionBudget(storedPDB, pdbDef, pdbDef.Namespace)
return patchPodDisruptionBudget(storedPDB, pdbDef, pdbDef.Namespace, cl)
}

// patchPodDisruptionBudget will patch Redis Kubernetes PodDisruptionBudgets
func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *policyv1.PodDisruptionBudget, namespace string) error {
func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *policyv1.PodDisruptionBudget, namespace string, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, storedPdb.Name)
// We want to try and keep this atomic as possible.
newPdb.ResourceVersion = storedPdb.ResourceVersion
Expand Down Expand Up @@ -169,20 +170,15 @@ func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *p
logger.Error(err, "Unable to patch redis PodDisruptionBudget with comparison object")
return err
}
return updatePodDisruptionBudget(namespace, newPdb)
return updatePodDisruptionBudget(namespace, newPdb, cl)
}
return nil
}

// createPodDisruptionBudget is a method to create PodDisruptionBudgets in Kubernetes
func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget) error {
func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdb.Name)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
_, err = client.PolicyV1().PodDisruptionBudgets(namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
_, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruptionBudget creation failed")
return err
Expand All @@ -192,14 +188,9 @@ func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudg
}

// updatePodDisruptionBudget is a method to update PodDisruptionBudgets in Kubernetes
func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget) error {
func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdb.Name)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
_, err = client.PolicyV1().PodDisruptionBudgets(namespace).Update(context.TODO(), pdb, metav1.UpdateOptions{})
_, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Update(context.TODO(), pdb, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruptionBudget update failed")
return err
Expand All @@ -209,14 +200,9 @@ func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudg
}

// deletePodDisruptionBudget is a method to delete PodDisruptionBudgets in Kubernetes
func deletePodDisruptionBudget(namespace string, pdbName string) error {
func deletePodDisruptionBudget(namespace string, pdbName string, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdbName)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
err = client.PolicyV1().PodDisruptionBudgets(namespace).Delete(context.TODO(), pdbName, metav1.DeleteOptions{})
err := cl.PolicyV1().PodDisruptionBudgets(namespace).Delete(context.TODO(), pdbName, metav1.DeleteOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruption deletion failed")
return err
Expand All @@ -226,17 +212,12 @@ func deletePodDisruptionBudget(namespace string, pdbName string) error {
}

// GetPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes
func GetPodDisruptionBudget(namespace string, pdb string) (*policyv1.PodDisruptionBudget, error) {
func GetPodDisruptionBudget(namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) {
logger := pdbLogger(namespace, pdb)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return nil, err
}
getOpts := metav1.GetOptions{
TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"),
}
pdbInfo, err := client.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), pdb, getOpts)
pdbInfo, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), pdb, getOpts)
if err != nil {
logger.V(1).Info("Redis PodDisruptionBudget get action failed")
return nil, err
Expand Down
Loading

0 comments on commit 7665d4a

Please sign in to comment.