diff --git a/.gitignore b/.gitignore index 692042ff4..a9b78ec1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /bin .bash_history .vscode +.idea/ diff --git a/mocks/service/k8s/Services.go b/mocks/service/k8s/Services.go index b7ab801cd..a88132a9f 100644 --- a/mocks/service/k8s/Services.go +++ b/mocks/service/k8s/Services.go @@ -819,6 +819,20 @@ func (_m *Services) UpdatePodDisruptionBudget(namespace string, podDisruptionBud return r0 } +// UpdatePodLabels provides a mock function with given fields: namespace, podName, labels +func (_m *Services) UpdatePodLabels(namespace string, podName string, labels map[string]string) error { + ret := _m.Called(namespace, podName, labels) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, map[string]string) error); ok { + r0 = rf(namespace, podName, labels) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateRole provides a mock function with given fields: namespace, role func (_m *Services) UpdateRole(namespace string, role *rbacv1.Role) error { ret := _m.Called(namespace, role) diff --git a/operator/redisfailover/service/check.go b/operator/redisfailover/service/check.go index 17ffbe2f0..f5d4a7f0f 100644 --- a/operator/redisfailover/service/check.go +++ b/operator/redisfailover/service/check.go @@ -74,9 +74,27 @@ func (r *RedisFailoverChecker) CheckSentinelNumber(rf *redisfailoverv1.RedisFail return nil } +func (r *RedisFailoverChecker) setMasterLabelIfNecessary(namespace string, pod corev1.Pod) error { + for labelKey, labelValue := range pod.ObjectMeta.Labels { + if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster { + return nil + } + } + return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel()) +} + +func (r *RedisFailoverChecker) setSlaveLabelIfNecessary(namespace string, pod corev1.Pod) error { + for labelKey, labelValue := range pod.ObjectMeta.Labels { + if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave { + return nil + } + } + return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel()) +} + // CheckAllSlavesFromMaster controlls that all slaves have the same master (the real one) func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redisfailoverv1.RedisFailover) error { - rips, err := r.GetRedisesIPs(rf) + rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf)) if err != nil { return err } @@ -86,13 +104,26 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis return err } - for _, rip := range rips { - slave, err := r.redisClient.GetSlaveOf(rip, password) + for _, rp := range rps.Items { + if rp.Status.PodIP == master { + err = r.setMasterLabelIfNecessary(rf.Namespace, rp) + if err != nil { + return err + } + } else { + err = r.setSlaveLabelIfNecessary(rf.Namespace, rp) + if err != nil { + return err + } + } + + slave, err := r.redisClient.GetSlaveOf(rp.Status.PodIP, password) if err != nil { + r.logger.Errorf("Get slave of master failed, maybe this node is not ready, pod ip: %s", rp.Status.PodIP) return err } if slave != "" && slave != master { - return fmt.Errorf("slave %s don't have the master %s, has %s", rip, master, slave) + return fmt.Errorf("slave %s don't have the master %s, has %s", rp.Status.PodIP, master, slave) } } return nil @@ -153,7 +184,8 @@ func (r *RedisFailoverChecker) GetMasterIP(rf *redisfailoverv1.RedisFailover) (s for _, rip := range rips { master, err := r.redisClient.IsMaster(rip, password) if err != nil { - return "", err + r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip) + continue } if master { masters = append(masters, rip) @@ -182,7 +214,8 @@ func (r *RedisFailoverChecker) GetNumberMasters(rf *redisfailoverv1.RedisFailove for _, rip := range rips { master, err := r.redisClient.IsMaster(rip, password) if err != nil { - return nMasters, err + r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip) + continue } if master { nMasters++ diff --git a/operator/redisfailover/service/check_test.go b/operator/redisfailover/service/check_test.go index aca3ff0b1..207cade29 100644 --- a/operator/redisfailover/service/check_test.go +++ b/operator/redisfailover/service/check_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -155,6 +157,7 @@ func TestCheckAllSlavesFromMasterGetStatefulSetError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(nil, errors.New("")) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{}) @@ -181,6 +184,7 @@ func TestCheckAllSlavesFromMasterGetSlaveOfError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} mr.On("GetSlaveOf", "", "").Once().Return("", errors.New("")) @@ -208,6 +212,7 @@ func TestCheckAllSlavesFromMasterDifferentMaster(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil) @@ -235,6 +240,7 @@ func TestCheckAllSlavesFromMaster(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil) @@ -578,7 +584,7 @@ func TestGetNumberMastersIsMasterError(t *testing.T) { checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{}) _, err := checker.GetNumberMasters(rf) - assert.Error(err) + assert.NoError(err) } func TestGetNumberMasters(t *testing.T) { diff --git a/operator/redisfailover/service/client.go b/operator/redisfailover/service/client.go index adabc4182..becbd3afa 100644 --- a/operator/redisfailover/service/client.go +++ b/operator/redisfailover/service/client.go @@ -46,6 +46,22 @@ func generateSelectorLabels(component, name string) map[string]string { } } +func generateRedisDefaultRoleLabel() map[string]string { + return generateRedisSlaveRoleLabel() +} + +func generateRedisMasterRoleLabel() map[string]string { + return map[string]string{ + redisRoleLabelKey: redisRoleLabelMaster, + } +} + +func generateRedisSlaveRoleLabel() map[string]string { + return map[string]string{ + redisRoleLabelKey: redisRoleLabelSlave, + } +} + // EnsureSentinelService makes sure the sentinel service exists func (r *RedisFailoverKubeClient) EnsureSentinelService(rf *redisfailoverv1.RedisFailover, labels map[string]string, ownerRefs []metav1.OwnerReference) error { svc := generateSentinelService(rf, labels, ownerRefs) diff --git a/operator/redisfailover/service/constants.go b/operator/redisfailover/service/constants.go index b27d062cf..4325a8a76 100644 --- a/operator/redisfailover/service/constants.go +++ b/operator/redisfailover/service/constants.go @@ -26,3 +26,9 @@ const ( appLabel = "redis-failover" hostnameTopologyKey = "kubernetes.io/hostname" ) + +const ( + redisRoleLabelKey = "redisfailovers-role" + redisRoleLabelMaster = "master" + redisRoleLabelSlave = "slave" +) diff --git a/operator/redisfailover/service/generator.go b/operator/redisfailover/service/generator.go index 3c593d66f..1f37a098a 100644 --- a/operator/redisfailover/service/generator.go +++ b/operator/redisfailover/service/generator.go @@ -250,6 +250,8 @@ func generateRedisStatefulSet(rf *redisfailoverv1.RedisFailover, labels map[stri redisCommand := getRedisCommand(rf) selectorLabels := generateSelectorLabels(redisRoleName, rf.Name) labels = util.MergeLabels(labels, selectorLabels) + labels = util.MergeLabels(labels, generateRedisDefaultRoleLabel()) + volumeMounts := getRedisVolumeMounts(rf) volumes := getRedisVolumes(rf) terminationGracePeriodSeconds := getTerminationGracePeriodSeconds(rf) diff --git a/operator/redisfailover/service/heal.go b/operator/redisfailover/service/heal.go index a0caf34b7..acda1da5b 100644 --- a/operator/redisfailover/service/heal.go +++ b/operator/redisfailover/service/heal.go @@ -9,6 +9,7 @@ import ( "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/service/k8s" "github.com/spotahome/redis-operator/service/redis" + v1 "k8s.io/api/core/v1" ) // RedisFailoverHeal defines the interface able to fix the problems on the redis failovers @@ -41,13 +42,45 @@ func NewRedisFailoverHealer(k8sService k8s.Services, redisClient redis.Client, l } } +func (r *RedisFailoverHealer) setMasterLabelIfNecessary(namespace string, pod v1.Pod) error { + for labelKey, labelValue := range pod.ObjectMeta.Labels { + if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster { + return nil + } + } + return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel()) +} + +func (r *RedisFailoverHealer) setSlaveLabelIfNecessary(namespace string, pod v1.Pod) error { + for labelKey, labelValue := range pod.ObjectMeta.Labels { + if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave { + return nil + } + } + return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel()) +} + func (r *RedisFailoverHealer) MakeMaster(ip string, rf *redisfailoverv1.RedisFailover) error { password, err := k8s.GetRedisPassword(r.k8sService, rf) if err != nil { return err } - return r.redisClient.MakeMaster(ip, password) + err = r.redisClient.MakeMaster(ip, password) + if err != nil { + return err + } + + rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf)) + if err != nil { + return err + } + for _, rp := range rps.Items { + if rp.Status.PodIP == ip { + return r.setMasterLabelIfNecessary(rf.Namespace, rp) + } + } + return nil } // SetOldestAsMaster puts all redis to the same master, choosen by order of appearance @@ -73,14 +106,26 @@ func (r *RedisFailoverHealer) SetOldestAsMaster(rf *redisfailoverv1.RedisFailove newMasterIP := "" for _, pod := range ssp.Items { if newMasterIP == "" { - newMasterIP = pod.Status.PodIP - r.logger.Debugf("New master is %s with ip %s", pod.Name, newMasterIP) - if err := r.redisClient.MakeMaster(newMasterIP, password); err != nil { + r.logger.Infof("New master is %s with ip %s", pod.Name, pod.Status.PodIP) + if err := r.redisClient.MakeMaster(pod.Status.PodIP, password); err != nil { + r.logger.Errorf("Make new master failed, master ip: %s, error: %v", pod.Status.PodIP, err) + continue + } + + err = r.setMasterLabelIfNecessary(rf.Namespace, pod) + if err != nil { return err } + + newMasterIP = pod.Status.PodIP } else { - r.logger.Debugf("Making pod %s slave of %s", pod.Name, newMasterIP) + r.logger.Infof("Making pod %s slave of %s", pod.Name, newMasterIP) if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, newMasterIP, password); err != nil { + r.logger.Errorf("Make slave failed, slave pod ip: %s, master ip: %s, error: %v", pod.Status.PodIP, newMasterIP, err) + } + + err = r.setSlaveLabelIfNecessary(rf.Namespace, pod) + if err != nil { return err } } @@ -102,13 +147,25 @@ func (r *RedisFailoverHealer) SetMasterOnAll(masterIP string, rf *redisfailoverv for _, pod := range ssp.Items { if pod.Status.PodIP == masterIP { - r.logger.Debugf("Ensure pod %s is master", pod.Name) + r.logger.Infof("Ensure pod %s is master", pod.Name) if err := r.redisClient.MakeMaster(masterIP, password); err != nil { + r.logger.Errorf("Make master failed, master ip: %s, error: %v", masterIP, err) + return err + } + + err = r.setMasterLabelIfNecessary(rf.Namespace, pod) + if err != nil { return err } } else { - r.logger.Debugf("Making pod %s slave of %s", pod.Name, masterIP) + r.logger.Infof("Making pod %s slave of %s", pod.Name, masterIP) if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, masterIP, password); err != nil { + r.logger.Errorf("Make slave failed, slave ip: %s, master ip: %s, error: %v", pod.Status.PodIP, masterIP, err) + return err + } + + err = r.setSlaveLabelIfNecessary(rf.Namespace, pod) + if err != nil { return err } } diff --git a/operator/redisfailover/service/heal_test.go b/operator/redisfailover/service/heal_test.go index 61eff590a..5220a6621 100644 --- a/operator/redisfailover/service/heal_test.go +++ b/operator/redisfailover/service/heal_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,13 +34,14 @@ func TestSetOldestAsMasterNewMasterError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(errors.New("")) healer := rfservice.NewRedisFailoverHealer(ms, mr, log.DummyLogger{}) err := healer.SetOldestAsMaster(rf) - assert.Error(err) + assert.NoError(err) } func TestSetOldestAsMaster(t *testing.T) { @@ -58,6 +61,7 @@ func TestSetOldestAsMaster(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(nil) @@ -89,6 +93,7 @@ func TestSetOldestAsMasterMultiplePodsMakeSlaveOfError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(nil) mr.On("MakeSlaveOf", "1.1.1.1", "0.0.0.0", "").Once().Return(errors.New("")) @@ -96,7 +101,7 @@ func TestSetOldestAsMasterMultiplePodsMakeSlaveOfError(t *testing.T) { healer := rfservice.NewRedisFailoverHealer(ms, mr, log.DummyLogger{}) err := healer.SetOldestAsMaster(rf) - assert.Error(err) + assert.NoError(err) } func TestSetOldestAsMasterMultiplePods(t *testing.T) { @@ -121,6 +126,7 @@ func TestSetOldestAsMasterMultiplePods(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(nil) mr.On("MakeSlaveOf", "1.1.1.1", "0.0.0.0", "").Once().Return(nil) @@ -163,6 +169,7 @@ func TestSetOldestAsMasterOrdering(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "1.1.1.1", "").Once().Return(nil) mr.On("MakeSlaveOf", "0.0.0.0", "1.1.1.1", "").Once().Return(nil) @@ -195,6 +202,7 @@ func TestSetMasterOnAllMakeMasterError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(errors.New("")) @@ -226,6 +234,7 @@ func TestSetMasterOnAllMakeSlaveOfError(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(nil) mr.On("MakeSlaveOf", "1.1.1.1", "0.0.0.0", "").Once().Return(errors.New("")) @@ -258,6 +267,7 @@ func TestSetMasterOnAll(t *testing.T) { ms := &mK8SService.Services{} ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil) + ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil) mr := &mRedisService.Client{} mr.On("MakeMaster", "0.0.0.0", "").Once().Return(nil) mr.On("MakeSlaveOf", "1.1.1.1", "0.0.0.0", "").Once().Return(nil) diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 5e81847c6..6125557ba 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -2,6 +2,9 @@ package k8s import ( "context" + "encoding/json" + + "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -19,6 +22,7 @@ type Pod interface { CreateOrUpdatePod(namespace string, pod *corev1.Pod) error DeletePod(namespace string, name string) error ListPods(namespace string) (*corev1.PodList, error) + UpdatePodLabels(namespace, podName string, labels map[string]string) error } // PodService is the pod service implementation using API calls to kubernetes. @@ -85,3 +89,31 @@ func (p *PodService) DeletePod(namespace string, name string) error { func (p *PodService) ListPods(namespace string) (*corev1.PodList, error) { return p.kubeClient.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{}) } + +//PatchStringValue specifies a patch operation for a string. +type PatchStringValue struct { + Op string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` +} + +func (p *PodService) UpdatePodLabels(namespace, podName string, labels map[string]string) error { + p.logger.Infof("Update pod label, namespace: %s, pod name: %s, labels: %v", namespace, podName, labels) + + var payloads []interface{} + for labelKey, labelValue := range labels { + payload := PatchStringValue{ + Op: "replace", + Path: "/metadata/labels/" + labelKey, + Value: labelValue, + } + payloads = append(payloads, payload) + } + payloadBytes, _ := json.Marshal(payloads) + + _, err := p.kubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}) + if err != nil { + p.logger.Errorf("Update pod labels failed, namespace: %s, pod name: %s, error: %v", namespace, podName, err) + } + return err +}