Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce the impact of Redis cluster intermediate states #1178

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,37 +73,40 @@
}

// Check if the cluster is downscaled
if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount {
if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount {
if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
return intctrlutil.Reconciled()
}

logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod
if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance)) {
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil {
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
if masterCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); masterCount == leaderCount {
logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod
if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance, shardIdx)) {
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance, shardIdx); err != nil {
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
}

Check warning on line 95 in pkg/controllers/rediscluster/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/rediscluster/rediscluster_controller.go#L80-L95

Added lines #L80 - L95 were not covered by tests
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance, shardIdx)
// Step 2 Reshard the Cluster
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, shardIdx, true)

Check warning on line 100 in pkg/controllers/rediscluster/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/rediscluster/rediscluster_controller.go#L98-L100

Added lines #L98 - L100 were not covered by tests
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance)
// Step 2 Reshard the Cluster
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, true)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster")
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
} else {
logger.Info("masterCount is not equal to leader statefulset replicas,skip downscale", "masterCount", masterCount, "leaderReplicas", leaderReplicas)

Check warning on line 108 in pkg/controllers/rediscluster/rediscluster_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/rediscluster/rediscluster_controller.go#L102-L108

Added lines #L102 - L108 were not covered by tests
}
logger.Info("Redis cluster is downscaled... Rebalancing the cluster")
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
}

// Mark the cluster status as initializing if there are no leader or follower nodes
Expand Down
18 changes: 8 additions & 10 deletions pkg/k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
// ReshardRedisCluster transfer the slots from the last node to the first node.
//
// NOTE: when all slot been transferred, the node become slave of the first master node.
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, remove bool) {
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32, remove bool) {

Check warning on line 18 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L18

Added line #L18 was not covered by tests
redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()

var cmd []string
currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader")

// Transfer Pod details
transferPOD := RedisDetails{
Expand All @@ -29,7 +28,7 @@
}
// Remove POD details
removePOD := RedisDetails{
PodName: cr.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
PodName: cr.Name + "-leader-" + strconv.Itoa(int(shardIdx)),

Check warning on line 31 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L31

Added line #L31 was not covered by tests
Namespace: cr.Namespace,
}
cmd = []string{"redis-cli", "--cluster", "reshard"}
Expand Down Expand Up @@ -274,18 +273,17 @@
}

// Remove redis follower node would remove all follower nodes of last leader node using redis-cli
func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) {
func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) {

Check warning on line 276 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L276

Added line #L276 was not covered by tests
var cmd []string
redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader")

existingPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-0",
Namespace: cr.Namespace,
}
lastLeaderPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(shardIdx)),

Check warning on line 286 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L286

Added line #L286 was not covered by tests
Namespace: cr.Namespace,
}

Expand Down Expand Up @@ -365,8 +363,8 @@
}

// verifyLeaderPod return true if the pod is leader/master
func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) bool {
podName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, leadIndex int32) bool {
podName := cr.Name + "-leader-" + strconv.Itoa(int(leadIndex))

Check warning on line 367 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L366-L367

Added lines #L366 - L367 were not covered by tests

redisClient := configureRedisClient(ctx, client, cr, podName)
defer redisClient.Close()
Expand All @@ -391,8 +389,8 @@
return false
}

func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(shardIdx))

Check warning on line 393 in pkg/k8sutils/cluster-scaling.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/cluster-scaling.go#L392-L393

Added lines #L392 - L393 were not covered by tests
// cmd = redis-cli cluster failover -a <pass>
var cmd []string
pod := RedisDetails{
Expand Down
12 changes: 12 additions & 0 deletions pkg/k8sutils/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

type StatefulSet interface {
IsStatefulSetReady(ctx context.Context, namespace, name string) bool
GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32
}

type StatefulSetService struct {
Expand Down Expand Up @@ -76,6 +77,17 @@
return true
}

func (s *StatefulSetService) GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32 {
sts, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return 0
}
if sts.Spec.Replicas == nil {
return 0
}
return *sts.Spec.Replicas

Check warning on line 88 in pkg/k8sutils/statefulset.go

View check run for this annotation

Codecov / codecov/patch

pkg/k8sutils/statefulset.go#L80-L88

Added lines #L80 - L88 were not covered by tests
}

const (
redisExporterContainer = "redis-exporter"
)
Expand Down
Loading