From 09aa7fc18111deab97413ecf0a8914349836db44 Mon Sep 17 00:00:00 2001 From: Shubham Bajpai Date: Fri, 12 Jun 2020 18:04:05 +0530 Subject: [PATCH] feat(migrate): add support for cstor volume migration (#9) Commits add support for CSI volume migration Signed-off-by: shubham --- .travis.yml | 2 +- Makefile | 3 + build/deploy.sh | 3 + cmd/migrate/executor/cstor_volume.go | 83 ++ cmd/migrate/executor/options.go | 1 + cmd/migrate/executor/setup_job.go | 1 + pkg/migrate/cstor/pool.go | 88 +- pkg/migrate/cstor/volume.go | 1027 ++++++++++++++++++++++++ pkg/migrate/cstor/volume_operations.go | 154 ++++ 9 files changed, 1322 insertions(+), 40 deletions(-) create mode 100644 cmd/migrate/executor/cstor_volume.go create mode 100644 pkg/migrate/cstor/volume.go create mode 100644 pkg/migrate/cstor/volume_operations.go diff --git a/.travis.yml b/.travis.yml index 350327c9..8cbca670 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,7 +37,7 @@ before_script: # TODO add golangci yaml config script: - make test - if [ "$TRAVIS_CPU_ARCH" == "amd64" ]; then - make upgrade-image.amd64; + make all.amd64; fi after_success: diff --git a/Makefile b/Makefile index e2c97b20..cb9745a4 100644 --- a/Makefile +++ b/Makefile @@ -129,6 +129,9 @@ cleanup-upgrade: include ./build/migrate/Makefile.mk +.PHONY: all.amd64 +all.amd64: upgrade-image.amd64 migrate-image.amd64 + # Push images .PHONY: deploy-images deploy-images: diff --git a/build/deploy.sh b/build/deploy.sh index 20fef701..565e2077 100755 --- a/build/deploy.sh +++ b/build/deploy.sh @@ -20,9 +20,12 @@ ARCH=$(uname -m) if [ "${ARCH}" = "x86_64" ]; then UPGRADE_IMG="${IMAGE_ORG}/upgrade-amd64" + MIGRATE_IMG="${IMAGE_ORG}/migrate-amd64" elif [ "${ARCH}" = "aarch64" ]; then UPGRADE_IMG="${IMAGE_ORG}/upgrade-arm64" + MIGRATE_IMG="${IMAGE_ORG}/migrate-arm64" fi # tag and push all the images DIMAGE="${UPGRADE_IMG}" ./build/push +DIMAGE="${MIGRATE_IMG}" ./build/push \ No newline at end of file diff --git a/cmd/migrate/executor/cstor_volume.go b/cmd/migrate/executor/cstor_volume.go new file mode 100644 index 00000000..e5e25f66 --- /dev/null +++ b/cmd/migrate/executor/cstor_volume.go @@ -0,0 +1,83 @@ +/* +Copyright 2020 The OpenEBS Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package executor + +import ( + "strings" + + "github.com/openebs/maya/pkg/util" + cstor "github.com/openebs/upgrade/pkg/migrate/cstor" + "github.com/spf13/cobra" + "k8s.io/klog" + + errors "github.com/pkg/errors" +) + +var ( + cstorVolumeMigrateCmdHelpText = ` +This command migrates the cStor Volume to csi format + +Usage: migrate cstor-volume --pv-name +` +) + +// NewMigrateCStorVolumeJob migrates all the cStor Pools associated with +// a given Storage Pool Claim +func NewMigrateCStorVolumeJob() *cobra.Command { + cmd := &cobra.Command{ + Use: "cstor-volume", + Short: "Migrate cStor Volume", + Long: cstorVolumeMigrateCmdHelpText, + Example: `migrate cstor-volume `, + Run: func(cmd *cobra.Command, args []string) { + util.CheckErr(options.RunPreFlightChecks(), util.Fatal) + util.CheckErr(options.RunCStorVolumeMigrateChecks(), util.Fatal) + util.CheckErr(options.RunCStorVolumeMigrate(), util.Fatal) + }, + } + + cmd.Flags().StringVarP(&options.pvName, + "pv-name", "", + options.pvName, + "cstor Volume name to be migrated. Run \"kubectl get pv\", to get pv-name") + + return cmd +} + +// RunCStorVolumeMigrateChecks will ensure the sanity of the cstor Volume migrate options +func (m *MigrateOptions) RunCStorVolumeMigrateChecks() error { + if len(strings.TrimSpace(m.pvName)) == 0 { + return errors.Errorf("Cannot execute migrate job: cstor pv name is missing") + } + + return nil +} + +// RunCStorVolumeMigrate migrates the given pv. +func (m *MigrateOptions) RunCStorVolumeMigrate() error { + + klog.Infof("Migrating volume %s to csi spec", m.pvName) + migrator := cstor.VolumeMigrator{} + err := migrator.Migrate(m.pvName, m.openebsNamespace) + if err != nil { + klog.Error(err) + return errors.Errorf("Failed to migrate cStor Volume : %s", m.pvName) + } + klog.Infof("Successfully migrated volume %s", m.pvName) + + return nil +} diff --git a/cmd/migrate/executor/options.go b/cmd/migrate/executor/options.go index 5d90663d..39de1e11 100644 --- a/cmd/migrate/executor/options.go +++ b/cmd/migrate/executor/options.go @@ -27,6 +27,7 @@ import ( type MigrateOptions struct { openebsNamespace string spcName string + pvName string } var ( diff --git a/cmd/migrate/executor/setup_job.go b/cmd/migrate/executor/setup_job.go index 4e94691d..c6387185 100644 --- a/cmd/migrate/executor/setup_job.go +++ b/cmd/migrate/executor/setup_job.go @@ -37,6 +37,7 @@ func NewJob() *cobra.Command { cmd.AddCommand( NewMigratePoolJob(), + NewMigrateCStorVolumeJob(), ) cmd.PersistentFlags().StringVarP(&options.openebsNamespace, diff --git a/pkg/migrate/cstor/pool.go b/pkg/migrate/cstor/pool.go index 273fbafd..9c7e9b67 100644 --- a/pkg/migrate/cstor/pool.go +++ b/pkg/migrate/cstor/pool.go @@ -33,7 +33,6 @@ import ( csp "github.com/openebs/maya/pkg/cstor/pool/v1alpha3" cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" spc "github.com/openebs/maya/pkg/storagepoolclaim/v1alpha1" - "github.com/openebs/maya/pkg/util/retry" "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,6 +53,7 @@ const ( cspiHostnameAnnotation = "cstorpoolinstance.openebs.io/hostname" spcFinalizer = "storagepoolclaim.openebs.io/finalizer" cspcFinalizer = "cstorpoolcluster.openebs.io/finalizer" + cspcKind = "CStorPoolCluster" ) // CSPCMigrator ... @@ -165,6 +165,10 @@ func (c *CSPCMigrator) migrate(spcName string) error { return err } } + err = addSkipAnnotationToSPC(c.SPCObj) + if err != nil { + return errors.Wrap(err, "failed to add skip-validation annotation") + } // Clean up old SPC resources after the migration is complete err = spc.NewKubeClient(). Delete(spcName, &metav1.DeleteOptions{}) @@ -268,25 +272,20 @@ func (c *CSPCMigrator) cspTocspi(cspiObj *cstor.CStorPoolInstance) error { return err } } - err = retry. - Times(60). - Wait(5 * time.Second). - Try(func(attempt uint) error { - klog.Infof("waiting for cspi %s to come to ONLINE state", cspiObj.Name) - cspiObj, err1 = c.OpenebsClientset.CstorV1(). - CStorPoolInstances(c.OpenebsNamespace). - Get(cspiObj.Name, metav1.GetOptions{}) - if err1 != nil { - return err1 - } - if cspiObj.Status.Phase != "ONLINE" { - return errors.Errorf("failed to verify cspi %s phase expected: ONLINE got: %s", - cspiObj.Name, cspiObj.Status.Phase) + for { + cspiObj, err1 = c.OpenebsClientset.CstorV1(). + CStorPoolInstances(c.OpenebsNamespace). + Get(cspiObj.Name, metav1.GetOptions{}) + if err1 != nil { + klog.Errorf("failed to get cspi %s: %s", cspiObj.Name, err1.Error()) + } else { + if cspiObj.Status.Phase == "ONLINE" { + break } - return nil - }) - if err != nil { - return err + klog.Infof("waiting for cspi %s to come to ONLINE state, got %s", + cspiObj.Name, cspiObj.Status.Phase) + } + time.Sleep(10 * time.Second) } err = c.updateCVRsLabels(cspObj, cspiObj) if err != nil { @@ -354,25 +353,23 @@ func (c *CSPCMigrator) scaleDownDeployment(cspObj *apis.CStorPool, openebsNamesp if err != nil { return err } - err = retry. - Times(60). - Wait(5 * time.Second). - Try(func(attempt uint) error { - klog.Infof("waiting for csp %s deployment to scale down", cspObj.Name) - cspPods, err1 := c.KubeClientset.CoreV1(). - Pods(openebsNamespace). - List(metav1.ListOptions{ - LabelSelector: "openebs.io/cstor-pool=" + cspObj.Name, - }) - if err1 != nil { - return errors.Wrapf(err1, "failed to get csp deploy") + for { + cspPods, err1 := c.KubeClientset.CoreV1(). + Pods(openebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/cstor-pool=" + cspObj.Name, + }) + if err1 != nil { + klog.Errorf("failed to list pods for csp %s deployment: %s", cspObj.Name, err1.Error()) + } else { + if len(cspPods.Items) == 0 { + break } - if len(cspPods.Items) != 0 { - return errors.Errorf("failed to scale down csp deployment") - } - return nil - }) - return err + klog.Infof("waiting for csp %s deployment to scale down", cspObj.Name) + } + time.Sleep(10 * time.Second) + } + return nil } // Update the bdc with the cspc labels instead of spc labels to allow @@ -418,13 +415,13 @@ func (c *CSPCMigrator) updateBDCOwnerRef() error { return err } for _, bdcItem := range bdcList.Items { - if bdcItem.OwnerReferences[0].Kind != "CStorPoolCluster" { + if bdcItem.OwnerReferences[0].Kind != cspcKind { bdcItem := bdcItem // pin it bdcObj := &bdcItem klog.Infof("Updating bdc %s with cspc %s ownerRef.", bdcObj.Name, c.CSPCObj.Name) bdcObj.OwnerReferences = []metav1.OwnerReference{ *metav1.NewControllerRef(c.CSPCObj, - apis.SchemeGroupVersion.WithKind(c.CSPCObj.Kind)), + cstor.SchemeGroupVersion.WithKind(cspcKind)), } _, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace). Update(bdcObj) @@ -466,3 +463,16 @@ func (c *CSPCMigrator) updateCVRsLabels(cspObj *apis.CStorPool, cspiObj *cstor.C } return nil } + +func addSkipAnnotationToSPC(spcObj *apis.StoragePoolClaim) error { +retry: + spcObj.Annotations = map[string]string{ + "openebs.io/skip-validations": "true", + } + _, err := spc.NewKubeClient().Update(spcObj) + if k8serrors.IsConflict(err) { + klog.Errorf("failed to update spc with skip-validation annotation due to conflict error") + goto retry + } + return err +} diff --git a/pkg/migrate/cstor/volume.go b/pkg/migrate/cstor/volume.go new file mode 100644 index 00000000..41c13a6e --- /dev/null +++ b/pkg/migrate/cstor/volume.go @@ -0,0 +1,1027 @@ +/* +Copyright 2020 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migrate + +import ( + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + goversion "github.com/hashicorp/go-version" + cstor "github.com/openebs/api/pkg/apis/cstor/v1" + "github.com/openebs/api/pkg/apis/types" + openebsclientset "github.com/openebs/api/pkg/client/clientset/versioned" + apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1" + cv "github.com/openebs/maya/pkg/cstor/volume/v1alpha1" + cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" + "github.com/openebs/upgrade/pkg/version" + errors "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog" +) + +var ( + cvcKind = "CStorVolumeConfig" + trueBool = true + cstorCSIDriver = "cstor.csi.openebs.io" + timeStamp = time.Now().UnixNano() / int64(time.Millisecond) + csiProvisionerIdentity = strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + "cstor.csi.openebs.io" +) + +// VolumeMigrator ... +type VolumeMigrator struct { + // kubeclientset is a standard kubernetes clientset + KubeClientset kubernetes.Interface + // openebsclientset is a openebs custom resource package generated for custom API group. + OpenebsClientset openebsclientset.Interface + PVName string + OpenebsNamespace string + CVNamespace string + StorageClass *storagev1.StorageClass +} + +// Migrate is the interface implementation for +func (v *VolumeMigrator) Migrate(pvName, openebsNamespace string) error { + v.PVName = pvName + v.OpenebsNamespace = openebsNamespace + cfg, err := rest.InClusterConfig() + if err != nil { + return errors.Wrap(err, "error building kubeconfig") + } + v.KubeClientset, err = kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building kubernetes clientset") + } + v.OpenebsClientset, err = openebsclientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building openebs clientset") + } + err = v.validateCVCOperator() + if err != nil { + return err + } + shouldMigrate, err := v.isMigrationRequired() + if err != nil { + return err + } + if shouldMigrate { + err = v.migrate() + if err != nil { + return err + } + } else { + klog.Infof("Volume %s already migrated to csi spec", pvName) + } + return v.deleteTempPolicy() +} + +func (v *VolumeMigrator) isMigrationRequired() (bool, error) { + cvList, err := cv.NewKubeclient().WithNamespace(""). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil && !k8serrors.IsNotFound(err) { + return false, err + } + if err == nil && len(cvList.Items) != 0 { + return true, nil + } + _, err = v.OpenebsClientset.CstorV1(). + CStorVolumes(v.OpenebsNamespace).Get(v.PVName, metav1.GetOptions{}) + if err == nil { + return false, nil + } + return false, err +} + +func (v *VolumeMigrator) deleteTempPolicy() error { + err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Delete(v.PVName, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + return nil +} + +func (v *VolumeMigrator) validateCVCOperator() error { + v1110, _ := goversion.NewVersion("1.11.0") + operatorPods, err := v.KubeClientset.CoreV1(). + Pods(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/component-name=cvc-operator", + }) + if err != nil { + return err + } + if len(operatorPods.Items) == 0 { + return fmt.Errorf("cvc operator pod missing") + } + for _, pod := range operatorPods.Items { + operatorVersion := strings.Split(pod.Labels["openebs.io/version"], "-")[0] + vOperator, err := goversion.NewVersion(operatorVersion) + if err != nil { + return errors.Wrap(err, "failed to get cvc operator version") + } + if vOperator.LessThan(v1110) { + return fmt.Errorf("cvc operator is in %s version, please upgrade it to 1.11.0 or above version", + pod.Labels["openebs.io/version"]) + } + } + return nil +} + +// migrate migrates the volume from non-CSI schema to CSI schema +func (v *VolumeMigrator) migrate() error { + var pvObj *corev1.PersistentVolume + pvcObj, pvPresent, err := v.validatePVName() + if err != nil { + return errors.Wrapf(err, "failed to validate pvname") + } + err = v.populateCVNamespace(v.PVName) + if err != nil { + return errors.Wrapf(err, "failed to cv namespace") + } + err = v.createTempPolicy() + if err != nil { + return errors.Wrapf(err, "failed to create temporary policy") + } + if pvPresent { + klog.Infof("Checking volume is not mounted on any application") + pvObj, err = v.IsVolumeMounted(v.PVName) + if err != nil { + return errors.Wrapf(err, "failed to verify mount status for pv {%s}", v.PVName) + } + if pvObj.Spec.PersistentVolumeSource.CSI == nil { + klog.Infof("Retaining PV to migrate into csi volume") + err = v.RetainPV(pvObj) + if err != nil { + return errors.Wrapf(err, "failed to retain pv {%s}", v.PVName) + } + } + err = v.updateStorageClass(pvObj.Name, pvObj.Spec.StorageClassName) + if err != nil { + return errors.Wrapf(err, "failed to update storageclass {%s}", pvObj.Spec.StorageClassName) + } + pvcObj, err = v.migratePVC(pvObj) + if err != nil { + return errors.Wrapf(err, "failed to migrate pvc to csi spec") + } + } else { + klog.Infof("PVC and storageclass already migrated to csi format") + } + v.StorageClass, err = v.KubeClientset.StorageV1(). + StorageClasses().Get(*pvcObj.Spec.StorageClassName, metav1.GetOptions{}) + if err != nil { + return err + } + _, err = v.migratePV(pvcObj) + if err != nil { + return errors.Wrapf(err, "failed to migrate pv to csi spec") + } + err = v.removeOldTarget() + if err != nil { + return errors.Wrapf(err, "failed to remove old target deployment") + } + klog.Infof("Creating CVC to bound the volume and trigger CSI driver") + err = v.createCVC(v.PVName) + if err != nil { + return errors.Wrapf(err, "failed to create cvc") + } + err = v.validateMigratedVolume() + if err != nil { + return errors.Wrapf(err, "failed to validate migrated volume") + } + err = v.patchTargetPodAffinity() + if err != nil { + return errors.Wrap(err, "failed to patch target affinity") + } + err = v.cleanupOldResources() + if err != nil { + return errors.Wrapf(err, "failed to cleanup old volume resources") + } + return nil +} + +func (v *VolumeMigrator) migratePVC(pvObj *corev1.PersistentVolume) (*corev1.PersistentVolumeClaim, error) { + pvcObj, recreateRequired, err := v.generateCSIPVC(pvObj.Name) + if err != nil { + return nil, err + } + if recreateRequired { + err := v.addSkipAnnotationToPVC(pvcObj) + if err != nil { + return nil, errors.Wrap(err, "failed to add skip-validations annotation") + } + klog.Infof("Recreating equivalent CSI PVC") + pvcObj, err = v.RecreatePVC(pvcObj) + if err != nil { + return nil, err + } + } + return pvcObj, nil +} + +func (v *VolumeMigrator) addSkipAnnotationToPVC(pvcObj *corev1.PersistentVolumeClaim) error { + oldPVC, err := v.KubeClientset.CoreV1(). + PersistentVolumeClaims(pvcObj.Namespace).Get(pvcObj.Name, metav1.GetOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + if k8serrors.IsNotFound(err) { + return nil + } + if oldPVC.Annotations["openebs.io/skip-validations"] != "true" { + newPVC := oldPVC.DeepCopy() + newPVC.Annotations["openebs.io/skip-validations"] = "true" + data, err := GetPatchData(oldPVC, newPVC) + if err != nil { + return err + } + _, err = v.KubeClientset.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).Patch( + oldPVC.Name, + k8stypes.StrategicMergePatchType, + data, + ) + } + return err +} + +func (v *VolumeMigrator) migratePV(pvcObj *corev1.PersistentVolumeClaim) (*corev1.PersistentVolume, error) { + pvObj, recreateRequired, err := v.generateCSIPV(pvcObj.Spec.VolumeName, pvcObj) + if err != nil { + return nil, err + } + if recreateRequired { + klog.Infof("Recreating equivalent CSI PV") + _, err = v.RecreatePV(pvObj) + if err != nil { + return nil, err + } + } + return pvObj, nil +} + +func (v *VolumeMigrator) generateCSIPVC(pvName string) (*corev1.PersistentVolumeClaim, bool, error) { + pvObj, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Get(pvName, metav1.GetOptions{}) + if err != nil { + return nil, false, err + } + pvcName := pvObj.Spec.ClaimRef.Name + pvcNamespace := pvObj.Spec.ClaimRef.Namespace + pvcObj, err := v.KubeClientset.CoreV1().PersistentVolumeClaims(pvcNamespace). + Get(pvcName, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, false, err + } + } + if pvcObj.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != cstorCSIDriver { + klog.Infof("Generating equivalent CSI PVC %s", pvcName) + csiPVC := &corev1.PersistentVolumeClaim{} + csiPVC.Name = pvcName + csiPVC.Namespace = pvcNamespace + csiPVC.Annotations = map[string]string{ + "volume.beta.kubernetes.io/storage-provisioner": cstorCSIDriver, + } + csiPVC.Spec.AccessModes = pvObj.Spec.AccessModes + csiPVC.Spec.Resources.Requests = pvObj.Spec.Capacity + csiPVC.Spec.StorageClassName = &pvObj.Spec.StorageClassName + csiPVC.Spec.VolumeMode = pvObj.Spec.VolumeMode + csiPVC.Spec.VolumeName = pvObj.Name + + return csiPVC, true, nil + } + klog.Infof("pvc already migrated") + return pvcObj, false, nil +} + +func (v *VolumeMigrator) generateCSIPV( + pvName string, + pvcObj *corev1.PersistentVolumeClaim, +) (*corev1.PersistentVolume, bool, error) { + pvObj, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Get(pvName, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, false, err + } + if k8serrors.IsNotFound(err) { + pvObj, err = v.generateCSIPVFromCV(pvName, pvcObj) + if err != nil { + return nil, false, err + } + return pvObj, true, nil + } + } + if pvObj.Spec.PersistentVolumeSource.CSI == nil { + klog.Infof("Generating equivalent CSI PV %s", v.PVName) + csiPV := &corev1.PersistentVolume{} + csiPV.Name = pvObj.Name + csiPV.Annotations = map[string]string{ + "pv.kubernetes.io/provisioned-by": cstorCSIDriver, + } + csiPV.Spec.AccessModes = pvObj.Spec.AccessModes + csiPV.Spec.ClaimRef = &corev1.ObjectReference{ + APIVersion: pvcObj.APIVersion, + Kind: pvcObj.Kind, + Name: pvcObj.Name, + Namespace: pvcObj.Namespace, + } + csiPV.Spec.Capacity = pvObj.Spec.Capacity + csiPV.Spec.PersistentVolumeSource = corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + Driver: cstorCSIDriver, + FSType: pvObj.Spec.PersistentVolumeSource.ISCSI.FSType, + VolumeHandle: pvObj.Name, + VolumeAttributes: map[string]string{ + "openebs.io/cas-type": "cstor", + "storage.kubernetes.io/csiProvisionerIdentity": csiProvisionerIdentity, + }, + }, + } + csiPV.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimDelete + if v.StorageClass.ReclaimPolicy != nil { + csiPV.Spec.PersistentVolumeReclaimPolicy = *v.StorageClass.ReclaimPolicy + } + csiPV.Spec.StorageClassName = pvObj.Spec.StorageClassName + csiPV.Spec.VolumeMode = pvObj.Spec.VolumeMode + return csiPV, true, nil + } + klog.Infof("PV %s already in csi form", pvObj.Name) + return pvObj, false, nil +} + +func (v *VolumeMigrator) generateCSIPVFromCV( + cvName string, + pvcObj *corev1.PersistentVolumeClaim, +) (*corev1.PersistentVolume, error) { + klog.Infof("Generating equivalent CSI PV %s", v.PVName) + cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(cvName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + csiPV := &corev1.PersistentVolume{} + csiPV.Name = cvObj.Name + csiPV.Spec.AccessModes = pvcObj.Spec.AccessModes + csiPV.Spec.ClaimRef = &corev1.ObjectReference{ + APIVersion: pvcObj.APIVersion, + Kind: pvcObj.Kind, + Name: pvcObj.Name, + Namespace: pvcObj.Namespace, + } + csiPV.Spec.Capacity = corev1.ResourceList{ + corev1.ResourceStorage: cvObj.Spec.Capacity, + } + csiPV.Spec.PersistentVolumeSource = corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + Driver: cstorCSIDriver, + FSType: cvObj.Annotations["openebs.io/fs-type"], + VolumeHandle: cvObj.Name, + VolumeAttributes: map[string]string{ + "openebs.io/cas-type": "cstor", + "storage.kubernetes.io/csiProvisionerIdentity": csiProvisionerIdentity, + }, + }, + } + csiPV.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimDelete + if v.StorageClass.ReclaimPolicy != nil { + csiPV.Spec.PersistentVolumeReclaimPolicy = *v.StorageClass.ReclaimPolicy + } + csiPV.Spec.StorageClassName = v.StorageClass.Name + csiPV.Spec.VolumeMode = pvcObj.Spec.VolumeMode + return csiPV, nil +} + +func (v *VolumeMigrator) createCVC(pvName string) error { + var ( + err error + cvcObj *cstor.CStorVolumeConfig + cvObj *apis.CStorVolume + ) + cvcObj, err = v.OpenebsClientset.CstorV1().CStorVolumeConfigs(v.OpenebsNamespace). + Get(pvName, metav1.GetOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + if k8serrors.IsNotFound(err) { + cvObj, err = cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(pvName, metav1.GetOptions{}) + if err != nil { + return err + } + cvrList, err := cvr.NewKubeclient().WithNamespace(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "", + }) + if err != nil { + return err + } + if len(cvrList.Items) == 0 { + return errors.Errorf("failed to get cvrs for volume %s", v.PVName) + } + annotations := map[string]string{ + "openebs.io/volumeID": pvName, + "openebs.io/volume-policy": pvName, + } + labels := map[string]string{ + "openebs.io/cstor-pool-cluster": v.StorageClass.Parameters["cstorPoolCluster"], + } + if len(cvObj.Labels["openebs.io/source-volume"]) != 0 { + labels["openebs.io/source-volume"] = cvObj.Labels["openebs.io/source-volume"] + } + finalizer := "cvc.openebs.io/finalizer" + cvcObj = cstor.NewCStorVolumeConfig(). + WithName(cvObj.Name). + WithNamespace(v.OpenebsNamespace). + WithAnnotations(annotations). + WithLabelsNew(labels). + WithFinalizer(finalizer) + cvcObj.Spec.Capacity = corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceStorage): cvObj.Spec.Capacity, + } + cvcObj.Spec.Provision.Capacity = corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceStorage): resource.MustParse(cvrList.Items[0].Spec.Capacity), + } + cvcObj.Spec.Provision.ReplicaCount = cvObj.Spec.ReplicationFactor + cvcObj.Status.Phase = cstor.CStorVolumeConfigPhasePending + cvcObj.VersionDetails = cstor.VersionDetails{ + Desired: version.Current(), + AutoUpgrade: false, + Status: cstor.VersionStatus{ + Current: version.Current(), + DependentsUpgraded: true, + }, + } + if len(cvObj.Labels["openebs.io/source-volume"]) != 0 { + cvcObj.Spec.CStorVolumeSource = cvObj.Labels["openebs.io/source-volume"] + "@" + cvObj.Annotations["openebs.io/snapshot"] + } + _, err = v.OpenebsClientset.CstorV1().CStorVolumeConfigs(v.OpenebsNamespace). + Create(cvcObj) + if err != nil { + return err + } + } + return nil +} + +func (v *VolumeMigrator) patchTargetSVCOwnerRef() error { + svcObj, err := v.KubeClientset.CoreV1(). + Services(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + cvcObj, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + newSVCObj := svcObj.DeepCopy() + newSVCObj.OwnerReferences = []metav1.OwnerReference{ + *metav1.NewControllerRef(cvcObj, + cstor.SchemeGroupVersion.WithKind(cvcKind)), + } + data, err := GetPatchData(svcObj, newSVCObj) + if err != nil { + return err + } + _, err = v.KubeClientset.CoreV1(). + Services(v.OpenebsNamespace). + Patch(v.PVName, k8stypes.StrategicMergePatchType, data) + return err +} + +// updateStorageClass recreates a new storageclass with the csi provisioner +// the older annotations with the casconfig are also preserved for information +// as the information about the storageclass cannot be gathered from other +// resources a temporary storageclass is created before deleting the original +func (v *VolumeMigrator) updateStorageClass(pvName, scName string) error { + var tmpSCObj *storagev1.StorageClass + klog.Infof("Updating storageclass %s with csi parameters", scName) + scObj, err := v.KubeClientset.StorageV1(). + StorageClasses(). + Get(scName, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return err + } + } + if scObj == nil || scObj.Provisioner != cstorCSIDriver { + tmpSCObj, err = v.createTmpSC(scName) + if err != nil { + return err + } + replicaCount, err := v.getReplicaCount(pvName) + if err != nil { + return err + } + cspcName, err := v.getCSPCName(pvName) + if err != nil { + return err + } + csiSC := tmpSCObj.DeepCopy() + csiSC.ObjectMeta = metav1.ObjectMeta{ + Name: scName, + Annotations: tmpSCObj.Annotations, + } + csiSC.Provisioner = cstorCSIDriver + csiSC.AllowVolumeExpansion = &trueBool + csiSC.Parameters = map[string]string{ + "cas-type": "cstor", + "replicaCount": replicaCount, + "cstorPoolCluster": cspcName, + } + if scObj != nil { + err = v.KubeClientset.StorageV1(). + StorageClasses().Delete(scObj.Name, &metav1.DeleteOptions{}) + if err != nil { + return err + } + } + scObj, err = v.KubeClientset.StorageV1(). + StorageClasses().Create(csiSC) + if err != nil { + return err + } + v.StorageClass = scObj + + } + err = v.KubeClientset.StorageV1(). + StorageClasses().Delete("tmp-migrate-"+scObj.Name, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return errors.Wrapf(err, "failed to delete temporary storageclass") + } + return nil +} + +func (v *VolumeMigrator) createTmpSC(scName string) (*storagev1.StorageClass, error) { + tmpSCName := "tmp-migrate-" + scName + tmpSCObj, err := v.KubeClientset.StorageV1(). + StorageClasses().Get(tmpSCName, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + scObj, err := v.KubeClientset.StorageV1(). + StorageClasses().Get(scName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + tmpSCObj = scObj.DeepCopy() + tmpSCObj.ObjectMeta = metav1.ObjectMeta{ + Name: tmpSCName, + Annotations: scObj.Annotations, + } + tmpSCObj, err = v.KubeClientset.StorageV1(). + StorageClasses().Create(tmpSCObj) + if err != nil { + return nil, errors.Wrapf(err, "failed to create temporary storageclass") + } + } + return tmpSCObj, nil +} + +func (v *VolumeMigrator) getReplicaCount(pvName string) (string, error) { + cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(pvName, metav1.GetOptions{}) + if err != nil { + return "", err + } + return strconv.Itoa(cvObj.Spec.ReplicationFactor), nil +} + +// the cv can be in the pvc namespace or openebs namespace +func (v *VolumeMigrator) populateCVNamespace(cvName string) error { + v.CVNamespace = v.OpenebsNamespace + cvList, err := cv.NewKubeclient().WithNamespace(""). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil { + return err + } + if len(cvList.Items) != 1 { + return errors.Errorf("expected exactly 1 cv for %s, got %d", v.PVName, len(cvList.Items)) + } + for _, cvObj := range cvList.Items { + if cvObj.Name == cvName { + v.CVNamespace = cvObj.Namespace + return nil + } + } + return errors.Errorf("cv %s not found for given pv", cvName) +} + +func (v *VolumeMigrator) getCSPCName(pvName string) (string, error) { + cvrList, err := cvr.NewKubeclient().WithNamespace(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + pvName, + }) + if err != nil { + return "", err + } + if len(cvrList.Items) == 0 { + return "", errors.Errorf("no cvr found for pv %s", pvName) + } + cspiName := cvrList.Items[0].Labels["cstorpoolinstance.openebs.io/name"] + if cspiName == "" { + return "", errors.Errorf("no cspi label found on cvr %s", cvrList.Items[0].Name) + } + lastIndex := strings.LastIndex(cspiName, "-") + return cspiName[:lastIndex], nil +} + +// validatePVName checks whether there exist any pvc for given pv name +// this is required in case the pv gets deleted and only pvc is left +func (v *VolumeMigrator) validatePVName() (*corev1.PersistentVolumeClaim, bool, error) { + var pvcObj *corev1.PersistentVolumeClaim + _, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, false, err + } + pvcList, err := v.KubeClientset.CoreV1(). + PersistentVolumeClaims(""). + List(metav1.ListOptions{}) + if err != nil { + return pvcObj, false, err + } + for _, pvcItem := range pvcList.Items { + pvcItem := pvcItem // pin it + if pvcItem.Spec.VolumeName == v.PVName { + pvcObj = &pvcItem + return pvcObj, false, nil + } + } + return pvcObj, false, errors.Errorf("No PVC found for the given PV %s", v.PVName) + } + return pvcObj, true, nil +} + +func (v *VolumeMigrator) removeOldTarget() error { + _, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + if k8serrors.IsNotFound(err) { + err = v.KubeClientset.AppsV1(). + Deployments(v.CVNamespace). + Delete(v.PVName+"-target", &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + } + // if cv namespace and openebs namespace is not same + // migrate the target service to openebs namespace + if v.CVNamespace != v.OpenebsNamespace { + err = v.migrateTargetSVC() + } + return nil +} + +func (v *VolumeMigrator) migrateTargetSVC() error { + svcObj, err := v.KubeClientset.CoreV1(). + Services(v.CVNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + if err == nil { + err = v.KubeClientset.CoreV1(). + Services(v.CVNamespace). + Delete(svcObj.Name, &metav1.DeleteOptions{}) + if err != nil { + return err + } + } + // get the target service in openebs namespace + _, err = v.KubeClientset.CoreV1(). + Services(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + // if the service is not found in openebs namespace create it + if k8serrors.IsNotFound(err) { + svcObj, err := v.getTargetSVC() + if err != nil { + return err + } + klog.Infof("creating target service %s in %s namespace", svcObj.Name, v.OpenebsNamespace) + svcObj, err = v.KubeClientset.CoreV1().Services(v.OpenebsNamespace).Create(svcObj) + if err != nil { + return err + } + } + return nil +} + +func (v *VolumeMigrator) getTargetSVC() (*corev1.Service, error) { + cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + svcObj := &corev1.Service{} + svcObj.ObjectMeta = metav1.ObjectMeta{ + Name: v.PVName, + Namespace: v.OpenebsNamespace, + Labels: map[string]string{ + "openebs.io/storage-engine-type": "cstor", + "openebs.io/cas-type": "cstor", + "openebs.io/target-service": "cstor-target-svc", + "openebs.io/persistent-volume": v.PVName, + "openebs.io/version": version.Current(), + }, + } + svcObj.Spec = corev1.ServiceSpec{ + ClusterIP: cvObj.Spec.TargetIP, + Ports: []corev1.ServicePort{ + { + Name: "cstor-iscsi", + Port: 3260, + Protocol: "TCP", + TargetPort: intstr.FromInt(3260), + }, + { + Name: "cstor-grpc", + Port: 7777, + Protocol: "TCP", + TargetPort: intstr.FromInt(7777), + }, + { + Name: "mgmt", + Port: 6060, + Protocol: "TCP", + TargetPort: intstr.FromInt(6060), + }, + { + Name: "exporter", + Port: 9500, + Protocol: "TCP", + TargetPort: intstr.FromInt(9500), + }, + }, + Selector: map[string]string{ + "app": "cstor-volume-manager", + "openebs.io/target": "cstor-target", + "openebs.io/persistent-volume": v.PVName, + }, + } + return svcObj, nil +} + +func (v *VolumeMigrator) createTempPolicy() error { + klog.Infof("Checking for a temporary policy of volume %s", v.PVName) + _, err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err == nil { + return nil + } + if !k8serrors.IsNotFound(err) { + return err + } + klog.Infof("Creating temporary policy %s for migration", v.PVName) + targetDeploy, err := v.KubeClientset.AppsV1(). + Deployments(v.CVNamespace). + Get(v.PVName+"-target", metav1.GetOptions{}) + if err != nil { + return err + } + cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + replicas, err := cvr.NewKubeclient(). + WithNamespace(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil { + return err + } + tempPolicy := &cstor.CStorVolumePolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.PVName, + Namespace: v.OpenebsNamespace, + }, + Spec: cstor.CStorVolumePolicySpec{ + Target: cstor.TargetSpec{ + PriorityClassName: targetDeploy.Spec.Template.Spec.PriorityClassName, + Tolerations: targetDeploy.Spec.Template.Spec.Tolerations, + }, + }, + } + if targetDeploy.Spec.Template.Spec.Affinity != nil { + tempPolicy.Spec.Target.PodAffinity = targetDeploy.Spec.Template.Spec.Affinity.PodAffinity + } + if targetDeploy.Spec.Template.Spec.NodeSelector != nil { + tempPolicy.Spec.Target.NodeSelector = targetDeploy.Spec.Template.Spec.NodeSelector + } + if len(replicas.Items) != cvObj.Spec.ReplicationFactor { + return errors.Errorf("failed to get cvrs for volume %s, expected %d got %d", + v.PVName, cvObj.Spec.ReplicationFactor, len(replicas.Items), + ) + } + for _, replica := range replicas.Items { + tempPolicy.Spec.ReplicaPoolInfo = append(tempPolicy.Spec.ReplicaPoolInfo, + cstor.ReplicaPoolInfo{ + PoolName: replica.Labels[cspiNameLabel], + }, + ) + } + _, err = v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Create(tempPolicy) + return err +} + +func (v *VolumeMigrator) validateMigratedVolume() error { + klog.Info("Validating the migrated volume") +retry: + cvcObj, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + if cvcObj.Status.Phase != cstor.CStorVolumeConfigPhaseBound { + klog.Infof("Waiting for cvc %s to become Bound, got: %s", v.PVName, cvcObj.Status.Phase) + time.Sleep(10 * time.Second) + goto retry + } + policy, err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + cvrList, err := v.OpenebsClientset.CstorV1(). + CStorVolumeReplicas(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil { + return err + } + for _, info := range policy.Spec.ReplicaPoolInfo { + found := false + cvrPools := []string{} + for _, replica := range cvrList.Items { + if info.PoolName == replica.Labels[types.CStorPoolInstanceNameLabelKey] { + found = true + } + if len(cvrPools) != len(cvrList.Items) { + cvrPools = append(cvrPools, replica.Labels[types.CStorPoolInstanceLabelKey]) + } + } + if !found { + return errors.Errorf("cvr expected to be scheduled %s pool, but cvrs scheduled on pools %v", + info.PoolName, + cvrPools, + ) + } + } + for { + cvObj, err1 := v.OpenebsClientset.CstorV1(). + CStorVolumes(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err1 != nil { + klog.Errorf("failed to get cv %s: %s", v.PVName, err1.Error()) + } else { + if cvObj.Status.Phase == cstor.CStorVolumePhase("Healthy") { + break + } + klog.Infof("Waiting for cv %s to come to Healthy state, got: %s", + v.PVName, cvObj.Status.Phase) + } + time.Sleep(10 * time.Second) + } + klog.Info("Patching the target svc with cvc owner ref") + err = v.patchTargetSVCOwnerRef() + if err != nil { + errors.Wrap(err, "failed to patch cvc owner ref to target svc") + } + return nil +} + +func (v *VolumeMigrator) patchTargetPodAffinity() error { + cvp, err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + if cvp.Spec.Target.PodAffinity == nil { + return nil + } + klog.Info("Patching target pod with old pod affinity rules") + targetDeploy, err := v.KubeClientset.AppsV1(). + Deployments(v.OpenebsNamespace). + Get(v.PVName+"-target", metav1.GetOptions{}) + if err != nil { + return err + } + newTargetDeploy := targetDeploy.DeepCopy() + if newTargetDeploy.Spec.Template.Spec.Affinity == nil { + newTargetDeploy.Spec.Template.Spec.Affinity = &corev1.Affinity{} + } + newTargetDeploy.Spec.Template.Spec.Affinity.PodAffinity = cvp.Spec.Target.PodAffinity + data, err := GetPatchData(targetDeploy, newTargetDeploy) + if err != nil { + return err + } + _, err = v.KubeClientset.AppsV1(). + Deployments(v.OpenebsNamespace). + Patch(v.PVName+"-target", k8stypes.StrategicMergePatchType, data) + return err +} + +func (v *VolumeMigrator) cleanupOldResources() error { + klog.Info("Cleaning up old volume resources") + cvrList, err := cvr.NewKubeclient(). + WithNamespace(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil { + return errors.Wrapf(err, "failed too list cvrs for %s", v.PVName) + } + for _, replica := range cvrList.Items { + rep := replica // pin it + rep.Finalizers = []string{} + _, err = cvr.NewKubeclient(). + WithNamespace(v.OpenebsNamespace). + Update(&rep) + if err != nil { + return errors.Wrapf(err, "failed to remove finalizer from cvr %s", rep.Name) + } + err = cvr.NewKubeclient(). + WithNamespace(v.OpenebsNamespace). + Delete(replica.Name) + if err != nil { + return err + } + } + cvcObj, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + newCVCObj := cvcObj.DeepCopy() + delete(newCVCObj.Annotations, "openebs.io/volume-policy") + data, err := GetPatchData(cvcObj, newCVCObj) + if err != nil { + return err + } + _, err = v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Patch(v.PVName, k8stypes.MergePatchType, data) + if err != nil { + return err + } + err = cv.NewKubeclient(). + WithNamespace(v.CVNamespace). + Delete(v.PVName) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + return nil +} diff --git a/pkg/migrate/cstor/volume_operations.go b/pkg/migrate/cstor/volume_operations.go new file mode 100644 index 00000000..db09ea47 --- /dev/null +++ b/pkg/migrate/cstor/volume_operations.go @@ -0,0 +1,154 @@ +/* +Copyright 2020 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package migrate + +import ( + "time" + + errors "github.com/pkg/errors" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog" +) + +// IsVolumeMounted checks if the volume is mounted into any pod. +// This check is required as if mounted the pod will not allow +// deleting the pvc for recreation into csi volume. +func (v *VolumeMigrator) IsVolumeMounted(pvName string) (*corev1.PersistentVolume, error) { + pvObj, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Get(pvName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + pvcName := pvObj.Spec.ClaimRef.Name + pvcNamespace := pvObj.Spec.ClaimRef.Namespace + podList, err := v.KubeClientset.CoreV1().Pods(pvcNamespace). + List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, podObj := range podList.Items { + for _, volume := range podObj.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + if volume.PersistentVolumeClaim.ClaimName == pvcName { + return nil, errors.Errorf( + "the volume %s is mounted on %s, please scale down all apps before migrating", + pvName, + podObj.Name, + ) + } + } + } + } + return pvObj, nil +} + +// RetainPV sets the Retain policy on the PV. +// This operation is performed to prevent deletion of the OpenEBS +// resources while deleting the pvc to recreate with migrated spec. +func (v *VolumeMigrator) RetainPV(pvObj *corev1.PersistentVolume) error { + pvObj.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain + _, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Update(pvObj) + if err != nil { + return err + } + return nil +} + +// RecreatePV recreates PV for the given PV object by first deleting +// the old PV with same name and creating a new PV having claimRef same +// as previous PV except for the uid to avoid any other PVC to claim it. +func (v *VolumeMigrator) RecreatePV(pvObj *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { + err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Delete(pvObj.Name, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return nil, err + } + err = v.isPVDeletedEventually(pvObj) + if err != nil { + return nil, err + } + pvObj, err = v.KubeClientset.CoreV1(). + PersistentVolumes(). + Create(pvObj) + if err != nil { + return nil, err + } + return pvObj, nil +} + +// RecreatePVC recreates PVC for the given PVC object by first deleting +// the old PVC with same name and creating a new PVC. +func (v *VolumeMigrator) RecreatePVC(pvcObj *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { + err := v.KubeClientset.CoreV1(). + PersistentVolumeClaims(pvcObj.Namespace). + Delete(pvcObj.Name, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return nil, err + } + err = v.isPVCDeletedEventually(pvcObj) + if err != nil { + return nil, err + } + pvcObj, err = v.KubeClientset.CoreV1(). + PersistentVolumeClaims(pvcObj.Namespace). + Create(pvcObj) + if err != nil { + return nil, err + } + return pvcObj, nil +} + +// IsPVCDeletedEventually tries to get the deleted pvc +// and returns true if pvc is not found +// else returns false +func (v *VolumeMigrator) isPVCDeletedEventually(pvcObj *corev1.PersistentVolumeClaim) error { + for i := 1; i < 60; i++ { + _, err := v.KubeClientset.CoreV1(). + PersistentVolumeClaims(pvcObj.Namespace). + Get(pvcObj.Name, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + return nil + } + klog.Infof("Waiting for pvc %s to go away", pvcObj.Name) + time.Sleep(5 * time.Second) + } + return errors.Errorf("PVC %s still present", pvcObj.Name) +} + +// IsPVDeletedEventually tries to get the deleted pv +// and returns true if pv is not found +// else returns false +func (v *VolumeMigrator) isPVDeletedEventually(pvObj *corev1.PersistentVolume) error { + for i := 1; i < 60; i++ { + _, err := v.KubeClientset.CoreV1(). + PersistentVolumes(). + Get(pvObj.Name, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + return nil + } + klog.Infof("Waiting for pv %s to go away", pvObj.Name) + time.Sleep(5 * time.Second) + } + return errors.Errorf("PV %s still present", pvObj.Name) +}