Skip to content

Commit

Permalink
add pvc reclaim flag (#49)
Browse files Browse the repository at this point in the history
* add pvc reclaim flag

* remove redundant line

* 1. golint check 2. remove util/controller

* fix typo
  • Loading branch information
MegaByte875 authored Aug 29, 2021
1 parent 9a54bf3 commit 1ec35d3
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ run-controller-manager: manifests generate check
run-scheduler: manifests generate check
go run -ldflags '$(LDFLAGS)' cmd/scheduler/main.go

docker-build: build test ## Build docker images.
docker-build: build ## Build docker images.
docker build -t "${DOCKER_REPO}/nebula-operator:${IMAGE_TAG}" images/nebula-operator/

docker-push: docker-build ## Push docker images.
Expand Down
8 changes: 8 additions & 0 deletions apis/apps/v1alpha1/nebulacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ func (nc *NebulaCluster) GenerateOwnerReferences() []metav1.OwnerReference {
},
}
}

func (nc *NebulaCluster) IsPVReclaimEnabled() bool {
enabled := nc.Spec.EnablePVReclaim
if enabled == nil {
return false
}
return *enabled
}
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/nebulacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type NebulaClusterSpec struct {
// +optional
SchedulerName string `json:"schedulerName"`

// Flag to enable/disable pv reclaim while the nebula cluster deleted , default false
// +optional
EnablePVReclaim *bool `json:"enablePVReclaim,omitempty"`

// +kubebuilder:default=IfNotPresent
ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ spec:
type: object
spec:
properties:
enablePVReclaim:
type: boolean
graphd:
properties:
annotations:
Expand Down
2 changes: 2 additions & 0 deletions pkg/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
AnnPodSchedulingKey = "nebula-graph.io/pod-scheduling"
// AnnPodConfigMapHash is pod configmap hash key to update configuration dynamically
AnnPodConfigMapHash = "nebula-graph.io/cm-hash"
// AnnPvReclaimKey is annotation key that indicate whether reclaim persistent volume
AnnPvReclaimKey = "nebula-graph.io/enable-pv-reclaim"

// AnnHaModeVal is annotation value to indicate whether in ha mode
AnnHaModeVal = "true"
Expand Down
32 changes: 28 additions & 4 deletions pkg/controller/component/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/vesoft-inc/nebula-operator/pkg/kube"
"github.com/vesoft-inc/nebula-operator/pkg/label"
"github.com/vesoft-inc/nebula-operator/pkg/util/config"
controllerutil "github.com/vesoft-inc/nebula-operator/pkg/util/controller"
"github.com/vesoft-inc/nebula-operator/pkg/util/errors"
"github.com/vesoft-inc/nebula-operator/pkg/util/extender"
"github.com/vesoft-inc/nebula-operator/pkg/util/hash"
Expand Down Expand Up @@ -88,7 +88,7 @@ func syncService(component v1alpha1.NebulaClusterComponentter, svcClient kube.Se

oldSvcTmp, err := svcClient.GetService(newSvc.Namespace, newSvc.Name)
if apierrors.IsNotFound(err) {
if err := controllerutil.SetServiceLastAppliedConfigAnnotation(newSvc); err != nil {
if err := setServiceLastAppliedConfigAnnotation(newSvc); err != nil {
return err
}
return svcClient.CreateService(newSvc)
Expand All @@ -99,7 +99,7 @@ func syncService(component v1alpha1.NebulaClusterComponentter, svcClient kube.Se
}

oldSvc := oldSvcTmp.DeepCopy()
equal, err := controllerutil.ServiceEqual(newSvc, oldSvc)
equal, err := serviceEqual(newSvc, oldSvc)
if err != nil {
return err
}
Expand All @@ -110,7 +110,7 @@ func syncService(component v1alpha1.NebulaClusterComponentter, svcClient kube.Se
if !equal || !annoEqual || isOrphan {
svc := *oldSvc
svc.Spec = newSvc.Spec
if err := controllerutil.SetServiceLastAppliedConfigAnnotation(&svc); err != nil {
if err := setServiceLastAppliedConfigAnnotation(&svc); err != nil {
return err
}
if oldSvc.Spec.ClusterIP != "" {
Expand All @@ -131,6 +131,30 @@ func syncService(component v1alpha1.NebulaClusterComponentter, svcClient kube.Se
return nil
}

func setServiceLastAppliedConfigAnnotation(svc *corev1.Service) error {
b, err := json.Marshal(svc.Spec)
if err != nil {
return err
}
if svc.Annotations == nil {
svc.Annotations = map[string]string{}
}
svc.Annotations[annotation.AnnLastAppliedConfigKey] = string(b)
return nil
}

func serviceEqual(newSvc, oldSvc *corev1.Service) (bool, error) {
oldSpec := corev1.ServiceSpec{}
if lastAppliedConfig, ok := oldSvc.Annotations[annotation.AnnLastAppliedConfigKey]; ok {
err := json.Unmarshal([]byte(lastAppliedConfig), &oldSpec)
if err != nil {
return false, err
}
return apiequality.Semantic.DeepEqual(oldSpec, newSvc.Spec), nil
}
return false, nil
}

func syncConfigMap(
component v1alpha1.NebulaClusterComponentter,
cmClient kube.ConfigMap,
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/component/pvc_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func PvcGc(cli client.Client, namespace, clusterName string) error {

for i := range pvcs {
pvc := pvcs[i]
if pvc.Annotations[annotation.AnnPvReclaimKey] == "false" {
continue
}
if err := pvcClient.DeletePVC(pvc.Namespace, pvc.Name); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/component/reclaimer/meta_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func (m *meta) Reconcile(nc *v1alpha1.NebulaCluster) error {
log := getLog().WithValues("namespace", nc.Namespace, "name", nc.Name)
namespace := nc.GetNamespace()
clusterName := nc.GetClusterName()
l, err := label.New().Cluster(clusterName).Selector()
selector, err := label.New().Cluster(clusterName).Selector()
if err != nil {
return err
}
pods, err := m.clientSet.Pod().ListPods(namespace, l)
pods, err := m.clientSet.Pod().ListPods(namespace, selector)
if err != nil {
return fmt.Errorf("list pods for cluster %s/%s failed: %v", namespace, clusterName, err)
}
Expand All @@ -60,7 +60,7 @@ func (m *meta) Reconcile(nc *v1alpha1.NebulaCluster) error {
}
for i := range pvcs {
pvc := pvcs[i]
if err := m.clientSet.PVC().UpdateMetaInfo(pvc, &pod); err != nil {
if err := m.clientSet.PVC().UpdateMetaInfo(pvc, &pod, nc.IsPVReclaimEnabled()); err != nil {
return err
}
if pvc.Spec.VolumeName == "" {
Expand Down
28 changes: 22 additions & 6 deletions pkg/controller/component/reclaimer/pvc_reclaimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,57 @@ func (p *pvcReclaimer) reclaimPV(nc *v1alpha1.NebulaCluster) error {
pvc := pvcs[i]
pvcName := pvc.GetName()
if !label.Label(pvc.Labels).IsNebulaComponent() {
log.V(4).Info("pvc of cluster skip reclaim for not nebula component", "pvcName", pvcName)
log.V(4).Info("skip reclaim for not nebula component", "pvcName", pvcName)
continue
}

if pvc.Status.Phase != corev1.ClaimBound {
log.V(4).Info("pvc of cluster skip reclaim for pvc status is not bound", "pvcName", pvcName)
log.V(4).Info("skip reclaim for pvc status is not bound", "pvcName", pvcName)
continue
}

if pvc.DeletionTimestamp != nil {
log.V(4).Info("pvc of cluster skip reclaim for pvc has been deleted", "pvcName", pvcName)
log.V(4).Info("skip reclaim for pvc has been deleted", "pvcName", pvcName)
continue
}

if pvc.Annotations[annotation.AnnPVCDeferDeletingKey] == "" {
log.V(4).Info("pvc of cluster skip reclaim for pvc has not been marked as defer deleting pvc", "pvcName", pvcName)
log.V(4).Info("skip reclaim for pvc has not been marked as defer deleting pvc", "pvcName", pvcName)
continue
}

podName, exist := pvc.Annotations[annotation.AnnPodNameKey]
if !exist {
log.V(4).Info("pvc of cluster skip reclaim for pvc has no pod name annotation", "pvcName", pvcName)
log.V(4).Info("skip reclaim for pvc has no pod name annotation", "pvcName", pvcName)
continue
}

_, err := p.clientSet.Pod().GetPod(namespace, podName)
if err == nil {
log.V(4).Info("pvc of cluster skip reclaim for pvc is still referenced by a pod", "pvcName", pvcName)
log.V(4).Info("skip reclaim for pvc is still referenced by a pod", "pvcName", pvcName)
continue
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("cluster %s/%s get pvc %s pod %s from cache failed: %v", namespace, ncName, pvcName, podName, err)
}

pvName := pvc.Spec.VolumeName
pv, err := p.clientSet.PV().GetPersistentVolume(pvName)
if err != nil {
if apierrors.IsNotFound(err) {
continue
}
return fmt.Errorf("cluster %s/%s get pvc %s pv %s failed: %v", namespace, ncName, pvcName, pvName, err)
}

if pv.Spec.PersistentVolumeReclaimPolicy != corev1.PersistentVolumeReclaimDelete {
if err := p.clientSet.PV().PatchPVReclaimPolicy(pv, corev1.PersistentVolumeReclaimDelete); err != nil {
return fmt.Errorf("cluster %s/%s patch pv %s to %s failed: %v", namespace, ncName, pvName,
corev1.PersistentVolumeReclaimDelete, err)
}
log.Info("patch pv policy to Delete success", "pvName", pvName)
}

if err := p.clientSet.PVC().DeletePVC(pvc.Namespace, pvcName); err != nil {
return fmt.Errorf("cluster %s/%s delete pvc %s failed: %v", namespace, ncName, pvcName, err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kube/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kube

import (
"context"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -31,7 +32,7 @@ import (
type PersistentVolumeClaim interface {
CreatePVC(pvc *corev1.PersistentVolumeClaim) error
GetPVC(namespace, name string) (*corev1.PersistentVolumeClaim, error)
UpdateMetaInfo(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) error
UpdateMetaInfo(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod, isReclaimEnabled bool) error
UpdatePVC(pvc *corev1.PersistentVolumeClaim) error
DeletePVC(namespace string, name string) error
ListPVCs(namespace string, selector labels.Selector) ([]corev1.PersistentVolumeClaim, error)
Expand Down Expand Up @@ -74,13 +75,14 @@ func (p *pvcClient) ListPVCs(namespace string, selector labels.Selector) ([]core
return pvcList.Items, nil
}

func (p *pvcClient) UpdateMetaInfo(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) error {
func (p *pvcClient) UpdateMetaInfo(pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod, isReclaimEnabled bool) error {
podName := pod.GetName()
if pvc.Annotations == nil {
pvc.Annotations = make(map[string]string)
}
pvc.Labels[annotation.AnnPodNameKey] = podName
pvc.Annotations[annotation.AnnPodNameKey] = podName
pvc.Annotations[annotation.AnnPvReclaimKey] = strconv.FormatBool(isReclaimEnabled)

return p.UpdatePVC(pvc)
}
Expand Down
51 changes: 0 additions & 51 deletions pkg/util/controller/controller.go

This file was deleted.

21 changes: 0 additions & 21 deletions pkg/util/controller/log.go

This file was deleted.

0 comments on commit 1ec35d3

Please sign in to comment.