Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(redis): Add pod label of redis role, to support Master/Slave model. #419

Merged
merged 8 commits into from
Aug 24, 2022
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/bin
.bash_history
.vscode
.idea/
14 changes: 14 additions & 0 deletions mocks/service/k8s/Services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 39 additions & 6 deletions operator/redisfailover/service/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,27 @@ func (r *RedisFailoverChecker) CheckSentinelNumber(rf *redisfailoverv1.RedisFail
return nil
}

func (r *RedisFailoverChecker) setMasterLabelIfNecessary(namespace string, pod corev1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel())
}

func (r *RedisFailoverChecker) setSlaveLabelIfNecessary(namespace string, pod corev1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

// CheckAllSlavesFromMaster controlls that all slaves have the same master (the real one)
func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redisfailoverv1.RedisFailover) error {
rips, err := r.GetRedisesIPs(rf)
rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf))
if err != nil {
return err
}
Expand All @@ -86,13 +104,26 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis
return err
}

for _, rip := range rips {
slave, err := r.redisClient.GetSlaveOf(rip, password)
for _, rp := range rps.Items {
if rp.Status.PodIP == master {
err = r.setMasterLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
} else {
err = r.setSlaveLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
}

slave, err := r.redisClient.GetSlaveOf(rp.Status.PodIP, password)
if err != nil {
r.logger.Errorf("Get slave of master failed, maybe this node is not ready, pod ip: %s", rp.Status.PodIP)
return err
}
if slave != "" && slave != master {
return fmt.Errorf("slave %s don't have the master %s, has %s", rip, master, slave)
return fmt.Errorf("slave %s don't have the master %s, has %s", rp.Status.PodIP, master, slave)
}
}
return nil
Expand Down Expand Up @@ -153,7 +184,8 @@ func (r *RedisFailoverChecker) GetMasterIP(rf *redisfailoverv1.RedisFailover) (s
for _, rip := range rips {
master, err := r.redisClient.IsMaster(rip, password)
if err != nil {
return "", err
r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip)
continue
}
if master {
masters = append(masters, rip)
Expand Down Expand Up @@ -182,7 +214,8 @@ func (r *RedisFailoverChecker) GetNumberMasters(rf *redisfailoverv1.RedisFailove
for _, rip := range rips {
master, err := r.redisClient.IsMaster(rip, password)
if err != nil {
return nMasters, err
r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip)
continue
}
if master {
nMasters++
Expand Down
8 changes: 7 additions & 1 deletion operator/redisfailover/service/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -155,6 +157,7 @@ func TestCheckAllSlavesFromMasterGetStatefulSetError(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(nil, errors.New(""))
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}

checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{})
Expand All @@ -181,6 +184,7 @@ func TestCheckAllSlavesFromMasterGetSlaveOfError(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "", "").Once().Return("", errors.New(""))

Expand Down Expand Up @@ -208,6 +212,7 @@ func TestCheckAllSlavesFromMasterDifferentMaster(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil)

Expand Down Expand Up @@ -235,6 +240,7 @@ func TestCheckAllSlavesFromMaster(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil)

Expand Down Expand Up @@ -578,7 +584,7 @@ func TestGetNumberMastersIsMasterError(t *testing.T) {
checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{})

_, err := checker.GetNumberMasters(rf)
assert.Error(err)
assert.NoError(err)
}

func TestGetNumberMasters(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions operator/redisfailover/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func generateSelectorLabels(component, name string) map[string]string {
}
}

func generateRedisDefaultRoleLabel() map[string]string {
return generateRedisSlaveRoleLabel()
}

func generateRedisMasterRoleLabel() map[string]string {
return map[string]string{
redisRoleLabelKey: redisRoleLabelMaster,
}
}

func generateRedisSlaveRoleLabel() map[string]string {
return map[string]string{
redisRoleLabelKey: redisRoleLabelSlave,
}
}

// EnsureSentinelService makes sure the sentinel service exists
func (r *RedisFailoverKubeClient) EnsureSentinelService(rf *redisfailoverv1.RedisFailover, labels map[string]string, ownerRefs []metav1.OwnerReference) error {
svc := generateSentinelService(rf, labels, ownerRefs)
Expand Down
6 changes: 6 additions & 0 deletions operator/redisfailover/service/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ const (
appLabel = "redis-failover"
hostnameTopologyKey = "kubernetes.io/hostname"
)

const (
redisRoleLabelKey = "redisfailovers-role"
redisRoleLabelMaster = "master"
redisRoleLabelSlave = "slave"
)
2 changes: 2 additions & 0 deletions operator/redisfailover/service/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func generateRedisStatefulSet(rf *redisfailoverv1.RedisFailover, labels map[stri
redisCommand := getRedisCommand(rf)
selectorLabels := generateSelectorLabels(redisRoleName, rf.Name)
labels = util.MergeLabels(labels, selectorLabels)
labels = util.MergeLabels(labels, generateRedisDefaultRoleLabel())

volumeMounts := getRedisVolumeMounts(rf)
volumes := getRedisVolumes(rf)
terminationGracePeriodSeconds := getTerminationGracePeriodSeconds(rf)
Expand Down
71 changes: 64 additions & 7 deletions operator/redisfailover/service/heal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/service/k8s"
"github.com/spotahome/redis-operator/service/redis"
v1 "k8s.io/api/core/v1"
)

// RedisFailoverHeal defines the interface able to fix the problems on the redis failovers
Expand Down Expand Up @@ -41,13 +42,45 @@ func NewRedisFailoverHealer(k8sService k8s.Services, redisClient redis.Client, l
}
}

func (r *RedisFailoverHealer) setMasterLabelIfNecessary(namespace string, pod v1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel())
}

func (r *RedisFailoverHealer) setSlaveLabelIfNecessary(namespace string, pod v1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

func (r *RedisFailoverHealer) MakeMaster(ip string, rf *redisfailoverv1.RedisFailover) error {
password, err := k8s.GetRedisPassword(r.k8sService, rf)
if err != nil {
return err
}

return r.redisClient.MakeMaster(ip, password)
err = r.redisClient.MakeMaster(ip, password)
if err != nil {
return err
}

rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf))
if err != nil {
return err
}
for _, rp := range rps.Items {
if rp.Status.PodIP == ip {
return r.setMasterLabelIfNecessary(rf.Namespace, rp)
}
}
return nil
}

// SetOldestAsMaster puts all redis to the same master, choosen by order of appearance
Expand All @@ -73,14 +106,26 @@ func (r *RedisFailoverHealer) SetOldestAsMaster(rf *redisfailoverv1.RedisFailove
newMasterIP := ""
for _, pod := range ssp.Items {
if newMasterIP == "" {
newMasterIP = pod.Status.PodIP
r.logger.Debugf("New master is %s with ip %s", pod.Name, newMasterIP)
if err := r.redisClient.MakeMaster(newMasterIP, password); err != nil {
r.logger.Infof("New master is %s with ip %s", pod.Name, pod.Status.PodIP)
if err := r.redisClient.MakeMaster(pod.Status.PodIP, password); err != nil {
r.logger.Errorf("Make new master failed, master ip: %s, error: %v", pod.Status.PodIP, err)
continue
}

err = r.setMasterLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}

newMasterIP = pod.Status.PodIP
} else {
r.logger.Debugf("Making pod %s slave of %s", pod.Name, newMasterIP)
r.logger.Infof("Making pod %s slave of %s", pod.Name, newMasterIP)
if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, newMasterIP, password); err != nil {
r.logger.Errorf("Make slave failed, slave pod ip: %s, master ip: %s, error: %v", pod.Status.PodIP, newMasterIP, err)
}

err = r.setSlaveLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
Expand All @@ -102,13 +147,25 @@ func (r *RedisFailoverHealer) SetMasterOnAll(masterIP string, rf *redisfailoverv

for _, pod := range ssp.Items {
if pod.Status.PodIP == masterIP {
r.logger.Debugf("Ensure pod %s is master", pod.Name)
r.logger.Infof("Ensure pod %s is master", pod.Name)
if err := r.redisClient.MakeMaster(masterIP, password); err != nil {
r.logger.Errorf("Make master failed, master ip: %s, error: %v", masterIP, err)
return err
}

err = r.setMasterLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
} else {
r.logger.Debugf("Making pod %s slave of %s", pod.Name, masterIP)
r.logger.Infof("Making pod %s slave of %s", pod.Name, masterIP)
if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, masterIP, password); err != nil {
r.logger.Errorf("Make slave failed, slave ip: %s, master ip: %s, error: %v", pod.Status.PodIP, masterIP, err)
return err
}

err = r.setSlaveLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
Expand Down
Loading