Skip to content

Commit

Permalink
Merge pull request spotahome#489 from jiuker/feature_resize_pvc
Browse files Browse the repository at this point in the history
[feature]resize pvc
  • Loading branch information
ese authored Dec 12, 2022
2 parents 7aa06e2 + e11885e commit ea7e74e
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
63 changes: 63 additions & 0 deletions service/k8s/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package k8s
import (
"context"
"fmt"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/labels"

"github.com/spotahome/redis-operator/operator/redisfailover/util"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -107,6 +110,66 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
// we will replace the current namespace state.
statefulSet.ResourceVersion = storedStatefulSet.ResourceVersion
// resize pvc
// 1.Get the data already stored internally
// 2.Get the desired data
// 3.Start querying the pvc list when you find data inconsistencies
// 3.1 Comparison using real pvc capacity and desired data
// 3.1.1 Update if you find inconsistencies
// 3.2 Writing successful updates to internal
// 4. Set to old VolumeClaimTemplates to update.Prevent update error reporting
// 5. Set to old annotations to update
annotations := storedStatefulSet.Annotations
if annotations == nil {
annotations = map[string]string{
"storageCapacity": "0",
}
}
storedCapacity, _ := strconv.ParseInt(annotations["storageCapacity"], 0, 64)
if len(statefulSet.Spec.VolumeClaimTemplates) != 0 {
stateCapacity := statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().Value()
if storedCapacity != stateCapacity {
rfName := strings.TrimPrefix(storedStatefulSet.Name, "rfr-")
listOpt := metav1.ListOptions{
LabelSelector: labels.FormatLabels(
map[string]string{
"app.kubernetes.io/component": "redis",
"app.kubernetes.io/name": strings.TrimPrefix(storedStatefulSet.Name, "rfr-"),
"app.kubernetes.io/part-of": "redis-failover",
},
),
}
pvcs, err := s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).List(context.Background(), listOpt)
if err != nil {
return err
}
updateFailed := false
realUpdate := false
for _, pvc := range pvcs.Items {
realCapacity := pvc.Spec.Resources.Requests.Storage().Value()
if realCapacity != stateCapacity {
realUpdate = true
pvc.Spec.Resources.Requests = statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests
_, err = s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).Update(context.Background(), &pvc, metav1.UpdateOptions{})
if err != nil {
updateFailed = true
s.logger.WithField("namespace", namespace).WithField("pvc", pvc.Name).Warningf("resize pvc failed:%s", err.Error())
}
}
}
if !updateFailed && len(pvcs.Items) != 0 {
annotations["storageCapacity"] = fmt.Sprintf("%d", stateCapacity)
storedStatefulSet.Annotations = annotations
if realUpdate {
s.logger.WithField("namespace", namespace).WithField("statefulSet", statefulSet.Name).Infof("resize statefulset pvcs from %d to %d Success", storedCapacity, stateCapacity)
} else {
s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Warningf("set annotations,resize nothing")
}
}
}
}
// set stored.volumeClaimTemplates
statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates
statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations)
return s.UpdateStatefulSet(namespace, statefulSet)
}
Expand Down
104 changes: 104 additions & 0 deletions service/k8s/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"errors"
"testing"

"k8s.io/apimachinery/pkg/api/resource"

v1 "k8s.io/api/core/v1"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -116,4 +120,104 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) {
}
})
}
// test resize pvc
{
t.Run("test_Resize_Pvc", func(t *testing.T) {
assert := assert.New(t)
beforeSts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "teststatefulSet1",
ResourceVersion: "10",
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
{
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("0.5Gi"),
},
},
},
},
},
},
}
afterSts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "teststatefulSet1",
ResourceVersion: "10",
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
{
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},
}
pvcList := &v1.PersistentVolumeClaimList{
Items: []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/component": "redis",
"app.kubernetes.io/name": "teststatefulSet1",
"app.kubernetes.io/part-of": "redis-failover",
},
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "vol-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("0.5Gi"),
},
},
},
},
// resized already
{
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "vol-2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
// Mock.
mcli := &kubernetes.Clientset{}
mcli.AddReactor("get", "statefulsets", func(action kubetesting.Action) (bool, runtime.Object, error) {
return true, beforeSts, nil
})
mcli.AddReactor("list", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
return true, pvcList, nil
})
mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
// update pvc[0]
pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim)
return true, action.(kubetesting.UpdateActionImpl).Object, nil
})
service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy)
err := service.CreateOrUpdateStatefulSet(testns, afterSts)
assert.NoError(err)
assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources)
// should not call update
mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
panic("shouldn't call update")
})
service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy)
err = service.CreateOrUpdateStatefulSet(testns, afterSts)
assert.NoError(err)
})
}
}

0 comments on commit ea7e74e

Please sign in to comment.