From 0d9dbbeeae10ae69f8a7bd8e53006e1ae15aef1a Mon Sep 17 00:00:00 2001 From: Kenshin <35095889+kqzh@users.noreply.github.com> Date: Thu, 21 Sep 2023 10:25:46 +0800 Subject: [PATCH] feat: support pvc expansion (#285) * feat: add sync pvc * fix: test * fix * fix: continue --- pkg/controller/component/graphd_cluster.go | 8 +++++ pkg/controller/component/helper.go | 33 ++++++++++++++++++++ pkg/controller/component/metad_cluster.go | 8 +++++ pkg/controller/component/storaged_cluster.go | 8 +++++ pkg/kube/pvc.go | 2 ++ 5 files changed, 59 insertions(+) diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index b9b45ab3..42bc7e81 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -134,6 +134,10 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { } } + if err := c.syncGraphdPVC(nc); err != nil { + return err + } + if nc.GraphdComponent().IsReady() { endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP) if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations()); err != nil { @@ -205,6 +209,10 @@ func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1 nc.GraphdComponent().GetConfigMapKey()) } +func (c *graphdCluster) syncGraphdPVC(nc *v1alpha1.NebulaCluster) error { + return syncPVC(nc.GraphdComponent(), c.clientSet.PVC()) +} + func (c *graphdCluster) setTopologyZone(nc *v1alpha1.NebulaCluster, newReplicas int32) error { cmName := fmt.Sprintf("%s-%s", nc.GraphdComponent().GetName(), v1alpha1.ZoneSuffix) cm, err := c.clientSet.ConfigMap().GetConfigMap(nc.Namespace, cmName) diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index 55d8b26e..fd702c68 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -504,3 +504,36 @@ func updatePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error { func isPending(pod *corev1.Pod) bool { return pod.Status.Phase == corev1.PodPending } + +// 1. new pvc is not nil, old pvc is not nil, update storage +func syncPVC( + component v1alpha1.NebulaClusterComponent, + pvcClient kube.PersistentVolumeClaim) error { + replicas := int(component.ComponentSpec().Replicas()) + volumeClaims, err := component.GenerateVolumeClaim() + if err != nil { + return err + } + for _, volumeClaim := range volumeClaims { + for i := 0; i < replicas; i++ { + pvcName := fmt.Sprintf("%s-%s-%d", volumeClaim.Name, component.GetName(), i) + oldPVC, err := pvcClient.GetPVC(component.GetNamespace(), pvcName) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + } + if oldPVC == nil { + continue + } + if volumeClaim.Spec.Resources.Requests.Storage().Cmp(*oldPVC.Spec.Resources.Requests.Storage()) != 0 { + // only update storage + oldPVC.Spec.Resources.Requests = volumeClaim.Spec.Resources.Requests + if err = pvcClient.UpdatePVC(oldPVC); err != nil { + return err + } + } + } + } + return nil +} diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 1186ad2a..e0148900 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -131,6 +131,10 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { } } + if err := c.syncMetadPVC(nc); err != nil { + return err + } + if nc.MetadComponent().IsReady() { if err := c.setVersion(nc); err != nil { return err @@ -173,6 +177,10 @@ func (c *metadCluster) syncMetadConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.C nc.MetadComponent().GetConfigMapKey()) } +func (c *metadCluster) syncMetadPVC(nc *v1alpha1.NebulaCluster) error { + return syncPVC(nc.MetadComponent(), c.clientSet.PVC()) +} + func (c *metadCluster) setVersion(nc *v1alpha1.NebulaCluster) error { options, err := nebula.ClientOptions(nc) if err != nil { diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index 5cd4108d..e6f7f3ae 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -178,6 +178,10 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error } } + if err := c.syncStoragedPVC(nc); err != nil { + return err + } + if nc.StoragedComponent().IsReady() { endpoints := nc.GetStoragedEndpoints(v1alpha1.StoragedPortNameHTTP) if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations()); err != nil { @@ -227,6 +231,10 @@ func (c *storagedCluster) syncStoragedConfigMap(nc *v1alpha1.NebulaCluster) (*co nc.StoragedComponent().GetConfigMapKey()) } +func (c *storagedCluster) syncStoragedPVC(nc *v1alpha1.NebulaCluster) error { + return syncPVC(nc.StoragedComponent(), c.clientSet.PVC()) +} + func (c *storagedCluster) addStorageHosts(nc *v1alpha1.NebulaCluster, oldReplicas, newReplicas int32) error { options, err := nebula.ClientOptions(nc) if err != nil { diff --git a/pkg/kube/pvc.go b/pkg/kube/pvc.go index aa024535..cf909d5f 100644 --- a/pkg/kube/pvc.go +++ b/pkg/kube/pvc.go @@ -95,12 +95,14 @@ func (p *pvcClient) UpdatePVC(pvc *corev1.PersistentVolumeClaim) error { pvcName := pvc.GetName() pvcLabels := pvc.GetLabels() annotations := pvc.GetAnnotations() + requests := pvc.Spec.Resources.Requests return retry.RetryOnConflict(retry.DefaultBackoff, func() error { if updated, err := p.GetPVC(ns, pvcName); err == nil { pvc = updated.DeepCopy() pvc.SetLabels(pvcLabels) pvc.SetAnnotations(annotations) + pvc.Spec.Resources.Requests = requests } else { utilruntime.HandleError(fmt.Errorf("get PV [%s/%s] failed: %v", ns, pvcName, err)) return err