Skip to content

Commit

Permalink
Add resize pvcs tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
d-kuro committed May 18, 2022
1 parent 3df1e1e commit 20273d6
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 63 deletions.
9 changes: 5 additions & 4 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s MySQLClusterSpec) validateUpdate(old MySQLClusterSpec) field.ErrorList {
p := p.Child("restore")
allErrs = append(allErrs, field.Forbidden(p, "not editable"))
}

return append(allErrs, s.validateCreate()...)
}

Expand Down Expand Up @@ -502,9 +502,10 @@ type MySQLClusterConditionType string

// Valid values for MySQLClusterConditionType
const (
ConditionInitialized MySQLClusterConditionType = "Initialized"
ConditionAvailable MySQLClusterConditionType = "Available"
ConditionHealthy MySQLClusterConditionType = "Healthy"
ConditionInitialized MySQLClusterConditionType = "Initialized"
ConditionAvailable MySQLClusterConditionType = "Available"
ConditionHealthy MySQLClusterConditionType = "Healthy"
ConditionVolumeResized MySQLClusterConditionType = "VolumeResized"
)

// BackupStatus represents the status of the last successful backup.
Expand Down
1 change: 1 addition & 0 deletions controllers/mysqlcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ var _ = Describe("MySQLCluster reconciler", func() {
if cluster.Status.ReconcileInfo.Generation != cluster.Generation {
return fmt.Errorf("status is not updated")
}

return nil
}).Should(Succeed())

Expand Down
163 changes: 104 additions & 59 deletions controllers/resize_pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/utils/pointer"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

var (
ErrReduceVolumeSize = errors.New("cannot reduce volume size")
)

// reconcilePVC resizes the PVC as needed.
// Since the PVC template of the StatefulSet is unchangeable, the following steps are required to resize the PVC
//
Expand All @@ -45,30 +47,44 @@ func (r *MySQLClusterReconciler) reconcilePVC(ctx context.Context, req ctrl.Requ
return nil
}

resizeTarget, ok := r.needResizePVC(cluster, &sts)
resizeTarget, ok, err := r.needResizePVC(cluster, &sts)
if !ok {
return nil
}
if err != nil {
if condErr := r.updateResizeCondition(ctx, req, corev1.ConditionFalse, err.Error()); condErr != nil {
return fmt.Errorf("failed to update resize condition status fields in MySQLCluster: %s: %w", condErr, err)
}

return err
}

log.Info("Starting PVC resize")

patches, err := r.findPVCs(ctx, cluster, &sts, resizeTarget)
if err != nil {
return fmt.Errorf("failed to resize PVC: %w", err)
}
if err := r.resizePVCs(ctx, cluster, &sts, resizeTarget); err != nil {
if condErr := r.updateResizeCondition(ctx, req, corev1.ConditionFalse, err.Error()); condErr != nil {
return fmt.Errorf("failed to update resize condition status: %s: %w", condErr, err)
}

if err := r.resizePVCs(ctx, patches); err != nil {
return fmt.Errorf("failed to resize PVCs: %w", err)
return err
}

if err := r.deleteStatefulSet(ctx, &sts); err != nil {
return fmt.Errorf("failed to delete StatefulSet: %w", err)
if condErr := r.updateResizeCondition(ctx, req, corev1.ConditionFalse, err.Error()); condErr != nil {
return fmt.Errorf("failed to update resize condition status: %s: %w", condErr, err)
}

return err
}

if err := r.updateResizeCondition(ctx, req, corev1.ConditionTrue, "successfully resized pvc"); err != nil {
return fmt.Errorf("failed to update resize condition status: %w", err)
}

return nil
}

func (r *MySQLClusterReconciler) findPVCs(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet, resizeTarget map[string]corev1.PersistentVolumeClaim) (map[types.NamespacedName]*unstructured.Unstructured, error) {
func (r *MySQLClusterReconciler) resizePVCs(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet, resizeTarget map[string]corev1.PersistentVolumeClaim) error {
log := crlog.FromContext(ctx)

newSizes := make(map[string]*resource.Quantity)
Expand All @@ -80,9 +96,15 @@ func (r *MySQLClusterReconciler) findPVCs(ctx context.Context, cluster *mocov1be
newSizes[pvc.Name] = newSize
}

pvcsToKeep := make(map[string]*resource.Quantity, int(*sts.Spec.Replicas)*len(resizeTarget))
var replicas int32
if sts.Spec.Replicas == nil {
replicas = 1
} else {
replicas = *sts.Spec.Replicas
}
pvcsToKeep := make(map[string]*resource.Quantity, replicas*int32(len(resizeTarget)))
for _, pvc := range resizeTarget {
for i := int32(0); i < *sts.Spec.Replicas; i++ {
for i := int32(0); i < replicas; i++ {
name := fmt.Sprintf("%s-%s-%d", pvc.Name, sts.Name, i)
newSize := newSizes[pvc.Name]
pvcsToKeep[name] = newSize
Expand All @@ -91,16 +113,14 @@ func (r *MySQLClusterReconciler) findPVCs(ctx context.Context, cluster *mocov1be

selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("failed to parse selector: %w", err)
return fmt.Errorf("failed to parse selector: %w", err)
}

var pvcs corev1.PersistentVolumeClaimList
if err := r.Client.List(ctx, &pvcs, client.MatchingLabelsSelector{Selector: selector}); err != nil {
return nil, fmt.Errorf("failed to list PVCs: %w", err)
return fmt.Errorf("failed to list PVCs: %w", err)
}

patches := make(map[types.NamespacedName]*unstructured.Unstructured)

for _, pvc := range pvcs.Items {
newSize, ok := pvcsToKeep[pvc.Name]
if !ok {
Expand All @@ -109,48 +129,25 @@ func (r *MySQLClusterReconciler) findPVCs(ctx context.Context, cluster *mocov1be

supported, err := r.isVolumeExpansionSupported(ctx, &pvc)
if err != nil {
return nil, fmt.Errorf("failed to check if volume expansion is supported: %w", err)
return fmt.Errorf("failed to check if volume expansion is supported: %w", err)
}
if !supported {
log.Info("StorageClass used by PVC does not support volume expansion, skipped", "storageClassName", *pvc.Spec.StorageClassName, "pvcName", pvc.Name)
continue
}

pvcac := corev1ac.PersistentVolumeClaim(pvc.Name, pvc.Namespace).
WithSpec(corev1ac.PersistentVolumeClaimSpec().
WithResources(corev1ac.ResourceRequirements().
WithRequests(corev1.ResourceList{corev1.ResourceStorage: *newSize}),
),
)

obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvcac)
if err != nil {
return nil, fmt.Errorf("failed to convert PVC %s/%s to unstructured: %w", pvc.Namespace, pvc.Name, err)
}

patches[types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}] = &unstructured.Unstructured{Object: obj}
}

if len(patches) == 0 {
return nil, errors.New("could not find resizable PVCs")
}

return patches, nil
}

func (r *MySQLClusterReconciler) resizePVCs(ctx context.Context, patches map[types.NamespacedName]*unstructured.Unstructured) error {
log := crlog.FromContext(ctx)

for key, patch := range patches {
err := r.Patch(ctx, patch, client.Apply, &client.PatchOptions{
FieldManager: fieldManager,
Force: pointer.Bool(true),
})
if err != nil {
return fmt.Errorf("failed to patch PVC %s/%s: %w", key.Namespace, key.Name, err)
switch i := pvc.Spec.Resources.Requests.Storage().Cmp(*newSize); {
case i == 0: // volume size is equal
continue
case i == 1: // current volume size is greater than new size
return fmt.Errorf("failed to resize pvc %q, want size: %s, deployed size: %s: %w", pvc.Name, newSize.String(), pvc.Spec.Resources.Requests.Storage().String(), ErrReduceVolumeSize)
case i == -1: // current volume size is smaller than new size
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = *newSize

if err := r.Client.Update(ctx, &pvc); err != nil {
return fmt.Errorf("failed to update PVC: %w", err)
}
}

log.Info("Resized PVC", "pvcName", key.Name)
}

return nil
Expand All @@ -169,9 +166,9 @@ func (r *MySQLClusterReconciler) deleteStatefulSet(ctx context.Context, sts *app
return nil
}

func (r *MySQLClusterReconciler) needResizePVC(cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (map[string]corev1.PersistentVolumeClaim, bool) {
func (r *MySQLClusterReconciler) needResizePVC(cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (map[string]corev1.PersistentVolumeClaim, bool, error) {
if len(sts.Spec.VolumeClaimTemplates) == 0 {
return nil, false
return nil, false, nil
}

pvcSet := make(map[string]corev1.PersistentVolumeClaim, len(sts.Spec.VolumeClaimTemplates))
Expand All @@ -190,17 +187,22 @@ func (r *MySQLClusterReconciler) needResizePVC(cluster *mocov1beta2.MySQLCluster
deployedSize := current.Spec.Resources.Requests.Storage()
wantSize := pvc.Spec.Resources.Requests.Storage()

if deployedSize.Equal(wantSize.DeepCopy()) {
switch i := deployedSize.Cmp(wantSize.DeepCopy()); {
case i == 0: // volume size is equal
delete(pvcSet, pvc.Name)
continue
case i == 1: // volume size is greater
return nil, false, fmt.Errorf("failed to resize pvc %q, want size: %s, deployed size: %s: %w", pvc.Name, wantSize, deployedSize, ErrReduceVolumeSize)
case i == -1: // volume size is smaller
continue
}
}

if len(pvcSet) == 0 {
return nil, false
return nil, false, nil
}

return pvcSet, true
return pvcSet, true, nil
}

func (r *MySQLClusterReconciler) isVolumeExpansionSupported(ctx context.Context, pvc *corev1.PersistentVolumeClaim) (bool, error) {
Expand Down Expand Up @@ -235,3 +237,46 @@ func (MySQLClusterReconciler) isUpdatingStatefulSet(sts *appsv1.StatefulSet) boo

return false
}

func (r *MySQLClusterReconciler) updateResizeCondition(ctx context.Context, req ctrl.Request, status corev1.ConditionStatus, msg string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
now := metav1.Now()

var cluster mocov1beta2.MySQLCluster
if err := r.Client.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, &cluster); err != nil {
return fmt.Errorf("failed to get MySQLCluster %s/%s: %w", req.Namespace, req.Name, err)
}
orig := cluster.DeepCopy()

newCond := mocov1beta2.MySQLClusterCondition{
Type: mocov1beta2.ConditionVolumeResized,
Status: status,
Message: msg,
LastTransitionTime: now,
}

find := false

for i, cond := range cluster.Status.Conditions {
if cond.Type != mocov1beta2.ConditionVolumeResized {
continue
}
find = true
if cond.Status == status {
newCond.LastTransitionTime = cond.LastTransitionTime
}
cluster.Status.Conditions[i] = newCond
}

if !find {
cluster.Status.Conditions = append(cluster.Status.Conditions, newCond)
}

// if nothing has changed, skip updating.
if equality.Semantic.DeepEqual(orig, cluster) {
return nil
}

return r.Client.Status().Update(ctx, &cluster)
})
}
Loading

0 comments on commit 20273d6

Please sign in to comment.