Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]resize pvc #489

Merged
merged 10 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions service/k8s/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package k8s
import (
"context"
"fmt"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/labels"

"github.com/spotahome/redis-operator/operator/redisfailover/util"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -107,6 +110,68 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu
// namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency),
// we will replace the current namespace state.
statefulSet.ResourceVersion = storedStatefulSet.ResourceVersion
// resize pvc
// 1.Get the data already stored internally
// 2.Get the desired data
// 3.Start querying the pvc list when you find data inconsistencies
// 3.1 Comparison using real pvc capacity and desired data
// 3.1.1 Update if you find inconsistencies
// 3.2 Writing successful updates to internal
// 4. Set to old VolumeClaimTemplates to update.Prevent update error reporting
// 5. Set to old annotations to update
annotations := storedStatefulSet.Annotations
if annotations == nil {
annotations = map[string]string{
"storageCapacity": "0",
}
}
storedCapacity, _ := strconv.ParseInt(annotations["storageCapacity"], 0, 64)
if len(statefulSet.Spec.VolumeClaimTemplates) != 0 {
stateCapacity := statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().Value()
if storedCapacity != stateCapacity {
rfName := strings.TrimPrefix(storedStatefulSet.Name, "rfr-")
listOpt := metav1.ListOptions{
LabelSelector: labels.FormatLabels(
map[string]string{
"app.kubernetes.io/component": "redis",
"app.kubernetes.io/name": strings.TrimPrefix(storedStatefulSet.Name, "rfr-"),
"app.kubernetes.io/part-of": "redis-failover",
},
),
}
pvcs, err := s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).List(context.Background(), listOpt)
if err != nil {
return err
}
updateFailed := false
realUpdate := false
for _, pvc := range pvcs.Items {
realCapacity := pvc.Spec.Resources.Requests.Storage().Value()
if realCapacity != stateCapacity {
realUpdate = true
pvc.Spec.Resources.Requests = statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests
_, err = s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).Update(context.Background(), &pvc, metav1.UpdateOptions{})
if err != nil {
if !updateFailed {
jiuker marked this conversation as resolved.
Show resolved Hide resolved
updateFailed = true
}
s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Warningf("resize pvc failed:%s", err.Error())
jiuker marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
if !updateFailed && len(pvcs.Items) != 0 {
annotations["storageCapacity"] = fmt.Sprintf("%d", stateCapacity)
storedStatefulSet.Annotations = annotations
if realUpdate {
s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Infof("resize pvc will resize from %d to %d Success", storedCapacity, stateCapacity)
jiuker marked this conversation as resolved.
Show resolved Hide resolved
} else {
s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Warningf("set annotations,resize nothing")
}
jiuker marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
// set stored.volumeClaimTemplates
statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates
statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations)
return s.UpdateStatefulSet(namespace, statefulSet)
}
Expand Down
104 changes: 104 additions & 0 deletions service/k8s/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"errors"
"testing"

"k8s.io/apimachinery/pkg/api/resource"

v1 "k8s.io/api/core/v1"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -116,4 +120,104 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) {
}
})
}
// test resize pvc
{
t.Run("test_Resize_Pvc", func(t *testing.T) {
assert := assert.New(t)
beforeSts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "teststatefulSet1",
ResourceVersion: "10",
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
{
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("0.5Gi"),
},
},
},
},
},
},
}
afterSts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "teststatefulSet1",
ResourceVersion: "10",
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
{
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},
}
pvcList := &v1.PersistentVolumeClaimList{
Items: []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/component": "redis",
"app.kubernetes.io/name": "teststatefulSet1",
"app.kubernetes.io/part-of": "redis-failover",
},
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "vol-1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("0.5Gi"),
},
},
},
},
// resized already
{
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "vol-2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
// Mock.
mcli := &kubernetes.Clientset{}
mcli.AddReactor("get", "statefulsets", func(action kubetesting.Action) (bool, runtime.Object, error) {
return true, beforeSts, nil
})
mcli.AddReactor("list", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
return true, pvcList, nil
})
mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
// update pvc[0]
pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim)
return true, action.(kubetesting.UpdateActionImpl).Object, nil
})
service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy)
err := service.CreateOrUpdateStatefulSet(testns, afterSts)
assert.NoError(err)
assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources)
// should not call update
mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
panic("shouldn't call update")
})
service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy)
err = service.CreateOrUpdateStatefulSet(testns, afterSts)
assert.NoError(err)
})
}
}