diff --git a/pkg/migrate/cstor/cspc_generator.go b/pkg/migrate/cstor/cspc_generator.go index d5b818fb..0c14a487 100644 --- a/pkg/migrate/cstor/cspc_generator.go +++ b/pkg/migrate/cstor/cspc_generator.go @@ -53,23 +53,23 @@ func getBDList(cspObj apis.CStorPool) []cstor.CStorPoolInstanceBlockDevice { return list } -func getCSPCSpecForSPC(spc *apis.StoragePoolClaim, openebsNamespace string) (*cstor.CStorPoolCluster, error) { +func (c *CSPCMigrator) getCSPCSpecForSPC() (*cstor.CStorPoolCluster, error) { cspClient := csp.KubeClient() cspList, err := cspClient.List(metav1.ListOptions{ - LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spc.Name, + LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + c.SPCObj.Name, }) if err != nil { return nil, err } cspcObj := &cstor.CStorPoolCluster{} - cspcObj.Name = spc.Name + cspcObj.Name = c.SPCObj.Name cspcObj.Annotations = map[string]string{ // This label will be used to disable reconciliation on the dependants. // In this case that will be CSPI types.OpenEBSDisableDependantsReconcileKey: "true", } for _, cspObj := range cspList.Items { - cspDeployList, err := deploy.NewKubeClient().WithNamespace(openebsNamespace). + cspDeployList, err := deploy.NewKubeClient().WithNamespace(c.OpenebsNamespace). List(&metav1.ListOptions{ LabelSelector: "openebs.io/cstor-pool=" + cspObj.Name, }) @@ -122,20 +122,20 @@ func getCSPAuxResources(cspDeploy appsv1.Deployment) *corev1.ResourceRequirement } // generateCSPC creates an equivalent cspc for the given spc object -func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamespace string) ( +func (c *CSPCMigrator) generateCSPC() ( *cstor.CStorPoolCluster, error) { cspcObj, err := c.OpenebsClientset.CstorV1(). - CStorPoolClusters(openebsNamespace).Get(spcObj.Name, metav1.GetOptions{}) - if !k8serrors.IsNotFound(err) { + CStorPoolClusters(c.OpenebsNamespace).Get(c.SPCObj.Name, metav1.GetOptions{}) + if !k8serrors.IsNotFound(err) && err != nil { return nil, err } if err != nil { - cspcObj, err = getCSPCSpecForSPC(spcObj, openebsNamespace) + cspcObj, err = c.getCSPCSpecForSPC() if err != nil { return nil, err } cspcObj, err = c.OpenebsClientset.CstorV1(). - CStorPoolClusters(openebsNamespace).Create(cspcObj) + CStorPoolClusters(c.OpenebsNamespace).Create(cspcObj) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp Wait(5 * time.Second). Try(func(attempt uint) error { cspiList, err1 := c.OpenebsClientset.CstorV1(). - CStorPoolInstances(openebsNamespace).List( + CStorPoolInstances(c.OpenebsNamespace).List( metav1.ListOptions{ LabelSelector: types.CStorPoolClusterLabelKey + "=" + cspcObj.Name, }) @@ -168,7 +168,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp return nil, err } cspcObj, err = c.OpenebsClientset.CstorV1(). - CStorPoolClusters(openebsNamespace).Get(spcObj.Name, metav1.GetOptions{}) + CStorPoolClusters(c.OpenebsNamespace).Get(c.SPCObj.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -178,7 +178,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp // reconciliation disabled delete(cspcObj.Annotations, types.OpenEBSDisableDependantsReconcileKey) cspcObj, err = c.OpenebsClientset.CstorV1(). - CStorPoolClusters(openebsNamespace). + CStorPoolClusters(c.OpenebsNamespace). Update(cspcObj) if err != nil { return nil, err diff --git a/pkg/migrate/cstor/pool.go b/pkg/migrate/cstor/pool.go index 7d657221..c954da56 100644 --- a/pkg/migrate/cstor/pool.go +++ b/pkg/migrate/cstor/pool.go @@ -61,10 +61,13 @@ type CSPCMigrator struct { // openebsclientset is a openebs custom resource package generated for custom API group. OpenebsClientset openebsclientset.Interface CSPCObj *cstor.CStorPoolCluster + SPCObj *apis.StoragePoolClaim + OpenebsNamespace string } // Migrate ... func (c *CSPCMigrator) Migrate(name, namespace string) error { + c.OpenebsNamespace = namespace cfg, err := rest.InClusterConfig() if err != nil { return errors.Wrap(err, "error building kubeconfig") @@ -77,13 +80,15 @@ func (c *CSPCMigrator) Migrate(name, namespace string) error { if err != nil { return errors.Wrap(err, "error building openebs clientset") } - err = c.migrate(name, namespace) + err = c.migrate(name) return err } // Pool migrates the pool from SPC schema to CSPC schema -func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error { - spcObj, migrated, err := c.getSPCWithMigrationStatus(spcName, openebsNamespace) +func (c *CSPCMigrator) migrate(spcName string) error { + var err error + var migrated bool + c.SPCObj, migrated, err = c.getSPCWithMigrationStatus(spcName) if migrated { klog.Infof("spc %s is already migrated to cspc", spcName) return nil @@ -91,28 +96,28 @@ func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error { if err != nil { return err } - err = validateSPC(spcObj) + err = c.validateSPC() if err != nil { return err } - err = c.updateBDCLabels(spcName, openebsNamespace) + err = c.updateBDCLabels(spcName) if err != nil { return err } klog.Infof("Creating equivalent cspc for spc %s", spcName) - cspcObj, err := c.generateCSPC(spcObj, openebsNamespace) + c.CSPCObj, err = c.generateCSPC() if err != nil { return err } - err = c.updateBDCOwnerRef(cspcObj, openebsNamespace) + err = c.updateBDCOwnerRef() if err != nil { return err } // List all cspi created with reconcile off cspiList, err := c.OpenebsClientset.CstorV1(). - CStorPoolInstances(openebsNamespace). + CStorPoolInstances(c.OpenebsNamespace). List(metav1.ListOptions{ - LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcObj.Name, + LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + c.CSPCObj.Name, }) if err != nil { return err @@ -126,7 +131,7 @@ func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error { cspiItem := cspiItem // pin it cspiObj := &cspiItem if cspiObj.Status.Phase != "ONLINE" { - err = c.csptocspi(cspiObj, cspcObj, openebsNamespace) + err = c.csptocspi(cspiObj) if err != nil { return err } @@ -144,26 +149,26 @@ func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error { // validateSPC determines that if the spc is allowed to migrate or not. // If the max pool count does not match the number of csp for auto spc, or // the bd list in spc does not match bds from the csp migration should not be done. -func validateSPC(spcObj *apis.StoragePoolClaim) error { +func (c *CSPCMigrator) validateSPC() error { cspClient := csp.KubeClient() cspList, err := cspClient.List(metav1.ListOptions{ - LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spcObj.Name, + LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + c.SPCObj.Name, }) if err != nil { return err } - if spcObj.Spec.BlockDevices.BlockDeviceList == nil { - if spcObj.Spec.MaxPools == nil { - return errors.Errorf("invalid spc %s neither has bdc list nor maxpools", spcObj.Name) + if c.SPCObj.Spec.BlockDevices.BlockDeviceList == nil { + if c.SPCObj.Spec.MaxPools == nil { + return errors.Errorf("invalid spc %s neither has bdc list nor maxpools", c.SPCObj.Name) } - if *spcObj.Spec.MaxPools != len(cspList.Items) { + if *c.SPCObj.Spec.MaxPools != len(cspList.Items) { return errors.Errorf("maxpool count does not match csp count expected: %d got: %d", - *spcObj.Spec.MaxPools, len(cspList.Items)) + *c.SPCObj.Spec.MaxPools, len(cspList.Items)) } return nil } bdMap := map[string]int{} - for _, bdName := range spcObj.Spec.BlockDevices.BlockDeviceList { + for _, bdName := range c.SPCObj.Spec.BlockDevices.BlockDeviceList { bdMap[bdName]++ } for _, cspObj := range cspList.Items { @@ -184,7 +189,7 @@ func validateSPC(spcObj *apis.StoragePoolClaim) error { // getSPCWithMigrationStatus gets the spc by name and verifies if the spc is already // migrated or not. The spc will not be present in the cluster as the last step // of migration deletes the spc. -func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName, openebsNamespace string) (*apis.StoragePoolClaim, bool, error) { +func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName string) (*apis.StoragePoolClaim, bool, error) { spcObj, err := spc.NewKubeClient(). Get(spcName, metav1.GetOptions{}) // verify if the spc is already migrated. If an equivalent cspc exists then @@ -192,7 +197,7 @@ func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName, openebsNamespace strin if k8serrors.IsNotFound(err) { klog.Infof("spc %s not found.", spcName) _, err = c.OpenebsClientset.CstorV1(). - CStorPoolClusters(openebsNamespace).Get(spcName, metav1.GetOptions{}) + CStorPoolClusters(c.OpenebsNamespace).Get(spcName, metav1.GetOptions{}) if err != nil { return nil, false, errors.Wrapf(err, "failed to get equivalent cspc for spc %s", spcName) } @@ -205,13 +210,9 @@ func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName, openebsNamespace strin } // csptocspi migrates a CSP to CSPI based on hostname -func (c *CSPCMigrator) csptocspi( - cspiObj *cstor.CStorPoolInstance, - cspcObj *cstor.CStorPoolCluster, - openebsNamespace string, -) error { - hostnameLabel := string(apis.HostNameCPK) + "=" + cspiObj.Labels[string(apis.HostNameCPK)] - spcLabel := string(apis.StoragePoolClaimCPK) + "=" + cspcObj.Name +func (c *CSPCMigrator) csptocspi(cspiObj *cstor.CStorPoolInstance) error { + hostnameLabel := types.HostNameLabelKey + "=" + cspiObj.Labels[types.HostNameLabelKey] + spcLabel := string(apis.StoragePoolClaimCPK) + "=" + c.CSPCObj.Name cspLabel := hostnameLabel + "," + spcLabel var err1 error cspObj, err := getCSP(cspLabel) @@ -219,7 +220,7 @@ func (c *CSPCMigrator) csptocspi( return err } klog.Infof("Migrating csp %s to cspi %s", cspiObj.Name, cspObj.Name) - err = c.scaleDownDeployment(cspObj, openebsNamespace) + err = c.scaleDownDeployment(cspObj, c.OpenebsNamespace) if err != nil { return err } @@ -228,7 +229,7 @@ func (c *CSPCMigrator) csptocspi( cspiObj.Annotations[types.OldPoolName] = "cstor-" + string(cspObj.UID) delete(cspiObj.Annotations, types.OpenEBSDisableReconcileLabelKey) cspiObj, err = c.OpenebsClientset.CstorV1(). - CStorPoolInstances(openebsNamespace). + CStorPoolInstances(c.OpenebsNamespace). Update(cspiObj) if err != nil { return err @@ -238,7 +239,7 @@ func (c *CSPCMigrator) csptocspi( Wait(5 * time.Second). Try(func(attempt uint) error { cspiObj, err1 = c.OpenebsClientset.CstorV1(). - CStorPoolInstances(openebsNamespace). + CStorPoolInstances(c.OpenebsNamespace). Get(cspiObj.Name, metav1.GetOptions{}) if err1 != nil { return err1 @@ -252,7 +253,7 @@ func (c *CSPCMigrator) csptocspi( if err != nil { return err } - err = c.updateCVRsLabels(cspObj.Name, openebsNamespace, cspiObj) + err = c.updateCVRsLabels(cspiObj) if err != nil { return err } @@ -340,8 +341,8 @@ func (c *CSPCMigrator) scaleDownDeployment(cspObj *apis.CStorPool, openebsNamesp // Update the bdc with the cspc labels instead of spc labels to allow // filtering of bds claimed by the migrated cspc. -func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error { - bdcList, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace).List(metav1.ListOptions{ +func (c *CSPCMigrator) updateBDCLabels(cspcName string) error { + bdcList, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace).List(metav1.ListOptions{ LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + cspcName, }) if err != nil { @@ -359,7 +360,7 @@ func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error bdcObj.Finalizers[i] = cspcFinalizer } } - _, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace). + _, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace). Update(bdcObj) if err != nil { return errors.Wrapf(err, "failed to update bdc %s with cspc label & finalizer", bdcObj.Name) @@ -371,11 +372,11 @@ func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error // Update the bdc with the cspc OwnerReferences instead of spc OwnerReferences // to allow clean up of bdcs on deletion of cspc. -func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openebsNamespace string) error { +func (c *CSPCMigrator) updateBDCOwnerRef() error { bdcList, err := c.OpenebsClientset.OpenebsV1alpha1(). - BlockDeviceClaims(openebsNamespace). + BlockDeviceClaims(c.OpenebsNamespace). List(metav1.ListOptions{ - LabelSelector: types.CStorPoolClusterLabelKey + "=" + cspcObj.Name, + LabelSelector: types.CStorPoolClusterLabelKey + "=" + c.CSPCObj.Name, }) if err != nil { return err @@ -386,9 +387,9 @@ func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openeb bdcObj := &bdcItem klog.Infof("Updating bdc %s with cspc ownerRef.", bdcObj.Name) bdcObj.OwnerReferences[0].Kind = "CStorPoolCluster" - bdcObj.OwnerReferences[0].UID = cspcObj.UID + bdcObj.OwnerReferences[0].UID = c.CSPCObj.UID bdcObj.OwnerReferences[0].APIVersion = "cstor.openebs.io/v1" - _, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace). + _, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace). Update(bdcObj) if err != nil { return errors.Wrapf(err, "failed to update bdc %s with cspc onwerRef", bdcObj.Name) @@ -400,10 +401,10 @@ func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openeb // Update the cvrs on the old csp with the migrated cspi labels and annotations // to allow backward compatibility with old external provisioned volumes. -func (c *CSPCMigrator) updateCVRsLabels(cspName, openebsNamespace string, cspiObj *cstor.CStorPoolInstance) error { +func (c *CSPCMigrator) updateCVRsLabels(cspiObj *cstor.CStorPoolInstance) error { cvrList, err := cvr.NewKubeclient(). - WithNamespace(openebsNamespace).List(metav1.ListOptions{ - LabelSelector: cspNameLabel + "=" + cspName, + WithNamespace(c.OpenebsNamespace).List(metav1.ListOptions{ + LabelSelector: cspNameLabel + "=" + c.CSPCObj.Name, }) if err != nil { return err @@ -419,7 +420,7 @@ func (c *CSPCMigrator) updateCVRsLabels(cspName, openebsNamespace string, cspiOb cvrObj.Labels[cspiUIDLabel] = string(cspiObj.UID) delete(cvrObj.Annotations, cspHostnameAnnotation) cvrObj.Annotations[cspiHostnameAnnotation] = cspiObj.Spec.HostName - _, err = cvr.NewKubeclient().WithNamespace(openebsNamespace). + _, err = cvr.NewKubeclient().WithNamespace(c.OpenebsNamespace). Update(cvrObj) if err != nil { return errors.Wrapf(err, "failed to update cvr %s with cspc info", cvrObj.Name)