From 5f5c9e31813ccc75cafa1737df1504345f382281 Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 10 Jun 2020 17:06:54 +0530 Subject: [PATCH] add cvr check and improve job restarts Signed-off-by: shubham --- pkg/migrate/cstor/volume.go | 164 ++++++++++++++++++++++++++++++------ 1 file changed, 138 insertions(+), 26 deletions(-) diff --git a/pkg/migrate/cstor/volume.go b/pkg/migrate/cstor/volume.go index f846c5c0..17697ef7 100644 --- a/pkg/migrate/cstor/volume.go +++ b/pkg/migrate/cstor/volume.go @@ -25,6 +25,7 @@ import ( goversion "github.com/hashicorp/go-version" cstor "github.com/openebs/api/pkg/apis/cstor/v1" + "github.com/openebs/api/pkg/apis/types" openebsclientset "github.com/openebs/api/pkg/client/clientset/versioned" apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1" cv "github.com/openebs/maya/pkg/cstor/volume/v1alpha1" @@ -34,8 +35,9 @@ import ( corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -82,7 +84,48 @@ func (v *VolumeMigrator) Migrate(pvName, openebsNamespace string) error { if err != nil { return err } - return v.migrate() + shouldMigrate, err := v.isMigrationRequired() + if err != nil { + return err + } + if shouldMigrate { + err = v.migrate() + if err != nil { + return err + } + } else { + klog.Infof("Volume %s already migrated to csi spec", pvName) + } + return v.deleteTempPolicy() +} + +func (v *VolumeMigrator) isMigrationRequired() (bool, error) { + cvList, err := cv.NewKubeclient().WithNamespace(""). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil && !k8serrors.IsNotFound(err) { + return false, err + } + if err == nil && len(cvList.Items) != 0 { + return true, nil + } + _, err = v.OpenebsClientset.CstorV1(). + CStorVolumes(v.OpenebsNamespace).Get(v.PVName, metav1.GetOptions{}) + if err == nil { + return false, nil + } + return false, err +} + +func (v *VolumeMigrator) deleteTempPolicy() error { + err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Delete(v.PVName, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + return nil } func (v *VolumeMigrator) validateCVCOperator() error { @@ -185,7 +228,6 @@ func (v *VolumeMigrator) migrate() error { } func (v *VolumeMigrator) migratePVC(pvObj *corev1.PersistentVolume) (*corev1.PersistentVolumeClaim, error) { - klog.Infof("Generating equivalent CSI PVC") pvcObj, recreateRequired, err := v.generateCSIPVC(pvObj.Name) if err != nil { return nil, err @@ -219,7 +261,7 @@ func (v *VolumeMigrator) addSkipAnnotationToPVC(pvcObj *corev1.PersistentVolumeC } _, err = v.KubeClientset.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).Patch( oldPVC.Name, - types.StrategicMergePatchType, + k8stypes.StrategicMergePatchType, data, ) } @@ -227,7 +269,6 @@ func (v *VolumeMigrator) addSkipAnnotationToPVC(pvcObj *corev1.PersistentVolumeC } func (v *VolumeMigrator) migratePV(pvcObj *corev1.PersistentVolumeClaim) (*corev1.PersistentVolume, error) { - klog.Infof("Generating equivalent CSI PV") pvObj, recreateRequired, err := v.generateCSIPV(pvcObj.Spec.VolumeName, pvcObj) if err != nil { return nil, err @@ -259,6 +300,7 @@ func (v *VolumeMigrator) generateCSIPVC(pvName string) (*corev1.PersistentVolume } } if pvcObj.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != cstorCSIDriver { + klog.Infof("Generating equivalent CSI PVC %s", pvcName) csiPVC := &corev1.PersistentVolumeClaim{} csiPVC.Name = pvcName csiPVC.Namespace = pvcNamespace @@ -297,6 +339,7 @@ func (v *VolumeMigrator) generateCSIPV( } } if pvObj.Spec.PersistentVolumeSource.CSI == nil { + klog.Infof("Generating equivalent CSI PV %s", v.PVName) csiPV := &corev1.PersistentVolume{} csiPV.Name = pvObj.Name csiPV.Annotations = map[string]string{ @@ -337,6 +380,7 @@ func (v *VolumeMigrator) generateCSIPVFromCV( cvName string, pvcObj *corev1.PersistentVolumeClaim, ) (*corev1.PersistentVolume, error) { + klog.Infof("Generating equivalent CSI PV %s", v.PVName) cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). Get(cvName, metav1.GetOptions{}) if err != nil { @@ -391,6 +435,16 @@ func (v *VolumeMigrator) createCVC(pvName string) error { if err != nil { return err } + cvrList, err := cvr.NewKubeclient().WithNamespace(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "", + }) + if err != nil { + return err + } + if len(cvrList.Items) == 0 { + return errors.Errorf("failed to get cvrs for volume %s", v.PVName) + } annotations := map[string]string{ "openebs.io/volumeID": pvName, "openebs.io/volume-policy": pvName, @@ -412,7 +466,7 @@ func (v *VolumeMigrator) createCVC(pvName string) error { corev1.ResourceName(corev1.ResourceStorage): cvObj.Spec.Capacity, } cvcObj.Spec.Provision.Capacity = corev1.ResourceList{ - corev1.ResourceName(corev1.ResourceStorage): cvObj.Spec.Capacity, + corev1.ResourceName(corev1.ResourceStorage): resource.MustParse(cvrList.Items[0].Spec.Capacity), } cvcObj.Spec.Provision.ReplicaCount = cvObj.Spec.ReplicationFactor cvcObj.Status.Phase = cstor.CStorVolumeConfigPhasePending @@ -460,7 +514,7 @@ func (v *VolumeMigrator) patchTargetSVCOwnerRef() error { } _, err = v.KubeClientset.CoreV1(). Services(v.OpenebsNamespace). - Patch(v.PVName, types.StrategicMergePatchType, data) + Patch(v.PVName, k8stypes.StrategicMergePatchType, data) return err } @@ -519,12 +573,10 @@ func (v *VolumeMigrator) updateStorageClass(pvName, scName string) error { v.StorageClass = scObj } - if tmpSCObj != nil { - err = v.KubeClientset.StorageV1(). - StorageClasses().Delete(tmpSCObj.Name, &metav1.DeleteOptions{}) - if err != nil { - return errors.Wrapf(err, "failed to delete temporary storageclass") - } + err = v.KubeClientset.StorageV1(). + StorageClasses().Delete("tmp-migrate-"+scObj.Name, &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return errors.Wrapf(err, "failed to delete temporary storageclass") } return nil } @@ -575,6 +627,9 @@ func (v *VolumeMigrator) populateCVNamespace(cvName string) error { if err != nil { return err } + if len(cvList.Items) != 1 { + return errors.Errorf("expected exactly 1 cv for %s, got %d", v.PVName, len(cvList.Items)) + } for _, cvObj := range cvList.Items { if cvObj.Name == cvName { v.CVNamespace = cvObj.Namespace @@ -633,12 +688,20 @@ func (v *VolumeMigrator) validatePVName() (*corev1.PersistentVolumeClaim, bool, } func (v *VolumeMigrator) removeOldTarget() error { - err := v.KubeClientset.AppsV1(). - Deployments(v.CVNamespace). - Delete(v.PVName+"-target", &metav1.DeleteOptions{}) + _, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) if err != nil && !k8serrors.IsNotFound(err) { return err } + if k8serrors.IsNotFound(err) { + err = v.KubeClientset.AppsV1(). + Deployments(v.CVNamespace). + Delete(v.PVName+"-target", &metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + } // if cv namespace and openebs namespace is not same // migrate the target service to openebs namespace if v.CVNamespace != v.OpenebsNamespace { @@ -740,7 +803,7 @@ func (v *VolumeMigrator) getTargetSVC() (*corev1.Service, error) { } func (v *VolumeMigrator) createTempPolicy() error { - klog.Infof("Creating temporary policy %s for migration", v.PVName) + klog.Infof("Checking for a temporary policy of volume %s", v.PVName) _, err := v.OpenebsClientset.CstorV1(). CStorVolumePolicies(v.OpenebsNamespace). Get(v.PVName, metav1.GetOptions{}) @@ -750,12 +813,18 @@ func (v *VolumeMigrator) createTempPolicy() error { if !k8serrors.IsNotFound(err) { return err } + klog.Infof("Creating temporary policy %s for migration", v.PVName) targetDeploy, err := v.KubeClientset.AppsV1(). Deployments(v.CVNamespace). Get(v.PVName+"-target", metav1.GetOptions{}) if err != nil { return err } + cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } replicas, err := cvr.NewKubeclient(). WithNamespace(v.OpenebsNamespace). List(metav1.ListOptions{ @@ -782,6 +851,11 @@ func (v *VolumeMigrator) createTempPolicy() error { if targetDeploy.Spec.Template.Spec.NodeSelector != nil { tempPolicy.Spec.Target.NodeSelector = targetDeploy.Spec.Template.Spec.NodeSelector } + if len(replicas.Items) != cvObj.Spec.ReplicationFactor { + return errors.Errorf("failed to get cvrs for volume %s, expected %d got %d", + v.PVName, cvObj.Spec.ReplicationFactor, len(replicas.Items), + ) + } for _, replica := range replicas.Items { tempPolicy.Spec.ReplicaPoolInfo = append(tempPolicy.Spec.ReplicaPoolInfo, cstor.ReplicaPoolInfo{ @@ -797,6 +871,50 @@ func (v *VolumeMigrator) createTempPolicy() error { func (v *VolumeMigrator) validateMigratedVolume() error { klog.Info("Validating the migrated volume") +retry: + cvcObj, err := v.OpenebsClientset.CstorV1(). + CStorVolumeConfigs(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + if cvcObj.Status.Phase != cstor.CStorVolumeConfigPhaseBound { + klog.Infof("Waiting for cvc %s to become Bound, got: %s", v.PVName, cvcObj.Status.Phase) + time.Sleep(10 * time.Second) + goto retry + } + policy, err := v.OpenebsClientset.CstorV1(). + CStorVolumePolicies(v.OpenebsNamespace). + Get(v.PVName, metav1.GetOptions{}) + if err != nil { + return err + } + cvrList, err := v.OpenebsClientset.CstorV1(). + CStorVolumeReplicas(v.OpenebsNamespace). + List(metav1.ListOptions{ + LabelSelector: "openebs.io/persistent-volume=" + v.PVName, + }) + if err != nil { + return err + } + for _, info := range policy.Spec.ReplicaPoolInfo { + found := false + cvrPools := []string{} + for _, replica := range cvrList.Items { + if info.PoolName == replica.Labels[types.CStorPoolInstanceNameLabelKey] { + found = true + } + if len(cvrPools) != len(cvrList.Items) { + cvrPools = append(cvrPools, replica.Labels[types.CStorPoolInstanceLabelKey]) + } + } + if !found { + return errors.Errorf("cvr expected to be scheduled %s pool, but cvrs scheduled on pools %v", + info.PoolName, + cvrPools, + ) + } + } for { cvObj, err1 := v.OpenebsClientset.CstorV1(). CStorVolumes(v.OpenebsNamespace). @@ -813,7 +931,7 @@ func (v *VolumeMigrator) validateMigratedVolume() error { time.Sleep(10 * time.Second) } klog.Info("Patching the target svc with cvc owner ref") - err := v.patchTargetSVCOwnerRef() + err = v.patchTargetSVCOwnerRef() if err != nil { errors.Wrap(err, "failed to patch cvc owner ref to target svc") } @@ -848,7 +966,7 @@ func (v *VolumeMigrator) patchTargetPodAffinity() error { } _, err = v.KubeClientset.AppsV1(). Deployments(v.OpenebsNamespace). - Patch(v.PVName+"-target", types.StrategicMergePatchType, data) + Patch(v.PVName+"-target", k8stypes.StrategicMergePatchType, data) return err } @@ -878,12 +996,6 @@ func (v *VolumeMigrator) cleanupOldResources() error { return err } } - err = v.OpenebsClientset.CstorV1(). - CStorVolumePolicies(v.OpenebsNamespace). - Delete(v.PVName, &metav1.DeleteOptions{}) - if err != nil && !k8serrors.IsNotFound(err) { - return err - } cvcObj, err := v.OpenebsClientset.CstorV1(). CStorVolumeConfigs(v.OpenebsNamespace). Get(v.PVName, metav1.GetOptions{}) @@ -898,7 +1010,7 @@ func (v *VolumeMigrator) cleanupOldResources() error { } _, err = v.OpenebsClientset.CstorV1(). CStorVolumeConfigs(v.OpenebsNamespace). - Patch(v.PVName, types.MergePatchType, data) + Patch(v.PVName, k8stypes.MergePatchType, data) if err != nil { return err }