Skip to content

Commit

Permalink
feat: support pvc expansion (#285)
Browse files Browse the repository at this point in the history
* feat: add sync pvc

* fix: test

* fix

* fix: continue
  • Loading branch information
kqzh authored Sep 21, 2023
1 parent c04f1f9 commit 0d9dbbe
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
}
}

if err := c.syncGraphdPVC(nc); err != nil {
return err
}

if nc.GraphdComponent().IsReady() {
endpoints := nc.GetGraphdEndpoints(v1alpha1.GraphdPortNameHTTP)
if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations()); err != nil {
Expand Down Expand Up @@ -205,6 +209,10 @@ func (c *graphdCluster) syncGraphdConfigMap(nc *v1alpha1.NebulaCluster) (*corev1
nc.GraphdComponent().GetConfigMapKey())
}

func (c *graphdCluster) syncGraphdPVC(nc *v1alpha1.NebulaCluster) error {
return syncPVC(nc.GraphdComponent(), c.clientSet.PVC())
}

func (c *graphdCluster) setTopologyZone(nc *v1alpha1.NebulaCluster, newReplicas int32) error {
cmName := fmt.Sprintf("%s-%s", nc.GraphdComponent().GetName(), v1alpha1.ZoneSuffix)
cm, err := c.clientSet.ConfigMap().GetConfigMap(nc.Namespace, cmName)
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/component/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,36 @@ func updatePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error {
func isPending(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodPending
}

// 1. new pvc is not nil, old pvc is not nil, update storage
func syncPVC(
component v1alpha1.NebulaClusterComponent,
pvcClient kube.PersistentVolumeClaim) error {
replicas := int(component.ComponentSpec().Replicas())
volumeClaims, err := component.GenerateVolumeClaim()
if err != nil {
return err
}
for _, volumeClaim := range volumeClaims {
for i := 0; i < replicas; i++ {
pvcName := fmt.Sprintf("%s-%s-%d", volumeClaim.Name, component.GetName(), i)
oldPVC, err := pvcClient.GetPVC(component.GetNamespace(), pvcName)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
if oldPVC == nil {
continue
}
if volumeClaim.Spec.Resources.Requests.Storage().Cmp(*oldPVC.Spec.Resources.Requests.Storage()) != 0 {
// only update storage
oldPVC.Spec.Resources.Requests = volumeClaim.Spec.Resources.Requests
if err = pvcClient.UpdatePVC(oldPVC); err != nil {
return err
}
}
}
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error {
}
}

if err := c.syncMetadPVC(nc); err != nil {
return err
}

if nc.MetadComponent().IsReady() {
if err := c.setVersion(nc); err != nil {
return err
Expand Down Expand Up @@ -173,6 +177,10 @@ func (c *metadCluster) syncMetadConfigMap(nc *v1alpha1.NebulaCluster) (*corev1.C
nc.MetadComponent().GetConfigMapKey())
}

func (c *metadCluster) syncMetadPVC(nc *v1alpha1.NebulaCluster) error {
return syncPVC(nc.MetadComponent(), c.clientSet.PVC())
}

func (c *metadCluster) setVersion(nc *v1alpha1.NebulaCluster) error {
options, err := nebula.ClientOptions(nc)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/component/storaged_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
}
}

if err := c.syncStoragedPVC(nc); err != nil {
return err
}

if nc.StoragedComponent().IsReady() {
endpoints := nc.GetStoragedEndpoints(v1alpha1.StoragedPortNameHTTP)
if err := updateDynamicFlags(endpoints, newWorkload.GetAnnotations(), oldWorkload.GetAnnotations()); err != nil {
Expand Down Expand Up @@ -227,6 +231,10 @@ func (c *storagedCluster) syncStoragedConfigMap(nc *v1alpha1.NebulaCluster) (*co
nc.StoragedComponent().GetConfigMapKey())
}

func (c *storagedCluster) syncStoragedPVC(nc *v1alpha1.NebulaCluster) error {
return syncPVC(nc.StoragedComponent(), c.clientSet.PVC())
}

func (c *storagedCluster) addStorageHosts(nc *v1alpha1.NebulaCluster, oldReplicas, newReplicas int32) error {
options, err := nebula.ClientOptions(nc)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kube/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ func (p *pvcClient) UpdatePVC(pvc *corev1.PersistentVolumeClaim) error {
pvcName := pvc.GetName()
pvcLabels := pvc.GetLabels()
annotations := pvc.GetAnnotations()
requests := pvc.Spec.Resources.Requests

return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if updated, err := p.GetPVC(ns, pvcName); err == nil {
pvc = updated.DeepCopy()
pvc.SetLabels(pvcLabels)
pvc.SetAnnotations(annotations)
pvc.Spec.Resources.Requests = requests
} else {
utilruntime.HandleError(fmt.Errorf("get PV [%s/%s] failed: %v", ns, pvcName, err))
return err
Expand Down

0 comments on commit 0d9dbbe

Please sign in to comment.