Skip to content

Commit

Permalink
make use of struct instead of passing variables
Browse files Browse the repository at this point in the history
Signed-off-by: shubham <[email protected]>
  • Loading branch information
shubham14bajpai committed May 22, 2020
1 parent 56b7b6f commit 9ce6fc0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 55 deletions.
24 changes: 12 additions & 12 deletions pkg/migrate/cstor/cspc_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ func getBDList(cspObj apis.CStorPool) []cstor.CStorPoolInstanceBlockDevice {
return list
}

func getCSPCSpecForSPC(spc *apis.StoragePoolClaim, openebsNamespace string) (*cstor.CStorPoolCluster, error) {
func (c *CSPCMigrator) getCSPCSpecForSPC() (*cstor.CStorPoolCluster, error) {
cspClient := csp.KubeClient()
cspList, err := cspClient.List(metav1.ListOptions{
LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spc.Name,
LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + c.SPCObj.Name,
})
if err != nil {
return nil, err
}
cspcObj := &cstor.CStorPoolCluster{}
cspcObj.Name = spc.Name
cspcObj.Name = c.SPCObj.Name
cspcObj.Annotations = map[string]string{
// This label will be used to disable reconciliation on the dependants.
// In this case that will be CSPI
types.OpenEBSDisableDependantsReconcileKey: "true",
}
for _, cspObj := range cspList.Items {
cspDeployList, err := deploy.NewKubeClient().WithNamespace(openebsNamespace).
cspDeployList, err := deploy.NewKubeClient().WithNamespace(c.OpenebsNamespace).
List(&metav1.ListOptions{
LabelSelector: "openebs.io/cstor-pool=" + cspObj.Name,
})
Expand Down Expand Up @@ -122,20 +122,20 @@ func getCSPAuxResources(cspDeploy appsv1.Deployment) *corev1.ResourceRequirement
}

// generateCSPC creates an equivalent cspc for the given spc object
func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamespace string) (
func (c *CSPCMigrator) generateCSPC() (
*cstor.CStorPoolCluster, error) {
cspcObj, err := c.OpenebsClientset.CstorV1().
CStorPoolClusters(openebsNamespace).Get(spcObj.Name, metav1.GetOptions{})
if !k8serrors.IsNotFound(err) {
CStorPoolClusters(c.OpenebsNamespace).Get(c.SPCObj.Name, metav1.GetOptions{})
if !k8serrors.IsNotFound(err) && err != nil {
return nil, err
}
if err != nil {
cspcObj, err = getCSPCSpecForSPC(spcObj, openebsNamespace)
cspcObj, err = c.getCSPCSpecForSPC()
if err != nil {
return nil, err
}
cspcObj, err = c.OpenebsClientset.CstorV1().
CStorPoolClusters(openebsNamespace).Create(cspcObj)
CStorPoolClusters(c.OpenebsNamespace).Create(cspcObj)
if err != nil {
return nil, err
}
Expand All @@ -149,7 +149,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp
Wait(5 * time.Second).
Try(func(attempt uint) error {
cspiList, err1 := c.OpenebsClientset.CstorV1().
CStorPoolInstances(openebsNamespace).List(
CStorPoolInstances(c.OpenebsNamespace).List(
metav1.ListOptions{
LabelSelector: types.CStorPoolClusterLabelKey + "=" + cspcObj.Name,
})
Expand All @@ -168,7 +168,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp
return nil, err
}
cspcObj, err = c.OpenebsClientset.CstorV1().
CStorPoolClusters(openebsNamespace).Get(spcObj.Name, metav1.GetOptions{})
CStorPoolClusters(c.OpenebsNamespace).Get(c.SPCObj.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -178,7 +178,7 @@ func (c *CSPCMigrator) generateCSPC(spcObj *apis.StoragePoolClaim, openebsNamesp
// reconciliation disabled
delete(cspcObj.Annotations, types.OpenEBSDisableDependantsReconcileKey)
cspcObj, err = c.OpenebsClientset.CstorV1().
CStorPoolClusters(openebsNamespace).
CStorPoolClusters(c.OpenebsNamespace).
Update(cspcObj)
if err != nil {
return nil, err
Expand Down
87 changes: 44 additions & 43 deletions pkg/migrate/cstor/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ type CSPCMigrator struct {
// openebsclientset is a openebs custom resource package generated for custom API group.
OpenebsClientset openebsclientset.Interface
CSPCObj *cstor.CStorPoolCluster
SPCObj *apis.StoragePoolClaim
OpenebsNamespace string
}

// Migrate ...
func (c *CSPCMigrator) Migrate(name, namespace string) error {
c.OpenebsNamespace = namespace
cfg, err := rest.InClusterConfig()
if err != nil {
return errors.Wrap(err, "error building kubeconfig")
Expand All @@ -77,42 +80,44 @@ func (c *CSPCMigrator) Migrate(name, namespace string) error {
if err != nil {
return errors.Wrap(err, "error building openebs clientset")
}
err = c.migrate(name, namespace)
err = c.migrate(name)
return err
}

// Pool migrates the pool from SPC schema to CSPC schema
func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error {
spcObj, migrated, err := c.getSPCWithMigrationStatus(spcName, openebsNamespace)
func (c *CSPCMigrator) migrate(spcName string) error {
var err error
var migrated bool
c.SPCObj, migrated, err = c.getSPCWithMigrationStatus(spcName)
if migrated {
klog.Infof("spc %s is already migrated to cspc", spcName)
return nil
}
if err != nil {
return err
}
err = validateSPC(spcObj)
err = c.validateSPC()
if err != nil {
return err
}
err = c.updateBDCLabels(spcName, openebsNamespace)
err = c.updateBDCLabels(spcName)
if err != nil {
return err
}
klog.Infof("Creating equivalent cspc for spc %s", spcName)
cspcObj, err := c.generateCSPC(spcObj, openebsNamespace)
c.CSPCObj, err = c.generateCSPC()
if err != nil {
return err
}
err = c.updateBDCOwnerRef(cspcObj, openebsNamespace)
err = c.updateBDCOwnerRef()
if err != nil {
return err
}
// List all cspi created with reconcile off
cspiList, err := c.OpenebsClientset.CstorV1().
CStorPoolInstances(openebsNamespace).
CStorPoolInstances(c.OpenebsNamespace).
List(metav1.ListOptions{
LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcObj.Name,
LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + c.CSPCObj.Name,
})
if err != nil {
return err
Expand All @@ -126,7 +131,7 @@ func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error {
cspiItem := cspiItem // pin it
cspiObj := &cspiItem
if cspiObj.Status.Phase != "ONLINE" {
err = c.csptocspi(cspiObj, cspcObj, openebsNamespace)
err = c.csptocspi(cspiObj)
if err != nil {
return err
}
Expand All @@ -144,26 +149,26 @@ func (c *CSPCMigrator) migrate(spcName, openebsNamespace string) error {
// validateSPC determines that if the spc is allowed to migrate or not.
// If the max pool count does not match the number of csp for auto spc, or
// the bd list in spc does not match bds from the csp migration should not be done.
func validateSPC(spcObj *apis.StoragePoolClaim) error {
func (c *CSPCMigrator) validateSPC() error {
cspClient := csp.KubeClient()
cspList, err := cspClient.List(metav1.ListOptions{
LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spcObj.Name,
LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + c.SPCObj.Name,
})
if err != nil {
return err
}
if spcObj.Spec.BlockDevices.BlockDeviceList == nil {
if spcObj.Spec.MaxPools == nil {
return errors.Errorf("invalid spc %s neither has bdc list nor maxpools", spcObj.Name)
if c.SPCObj.Spec.BlockDevices.BlockDeviceList == nil {
if c.SPCObj.Spec.MaxPools == nil {
return errors.Errorf("invalid spc %s neither has bdc list nor maxpools", c.SPCObj.Name)
}
if *spcObj.Spec.MaxPools != len(cspList.Items) {
if *c.SPCObj.Spec.MaxPools != len(cspList.Items) {
return errors.Errorf("maxpool count does not match csp count expected: %d got: %d",
*spcObj.Spec.MaxPools, len(cspList.Items))
*c.SPCObj.Spec.MaxPools, len(cspList.Items))
}
return nil
}
bdMap := map[string]int{}
for _, bdName := range spcObj.Spec.BlockDevices.BlockDeviceList {
for _, bdName := range c.SPCObj.Spec.BlockDevices.BlockDeviceList {
bdMap[bdName]++
}
for _, cspObj := range cspList.Items {
Expand All @@ -184,15 +189,15 @@ func validateSPC(spcObj *apis.StoragePoolClaim) error {
// getSPCWithMigrationStatus gets the spc by name and verifies if the spc is already
// migrated or not. The spc will not be present in the cluster as the last step
// of migration deletes the spc.
func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName, openebsNamespace string) (*apis.StoragePoolClaim, bool, error) {
func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName string) (*apis.StoragePoolClaim, bool, error) {
spcObj, err := spc.NewKubeClient().
Get(spcName, metav1.GetOptions{})
// verify if the spc is already migrated. If an equivalent cspc exists then
// spc is already migrated as spc is only deleted as last step.
if k8serrors.IsNotFound(err) {
klog.Infof("spc %s not found.", spcName)
_, err = c.OpenebsClientset.CstorV1().
CStorPoolClusters(openebsNamespace).Get(spcName, metav1.GetOptions{})
CStorPoolClusters(c.OpenebsNamespace).Get(spcName, metav1.GetOptions{})
if err != nil {
return nil, false, errors.Wrapf(err, "failed to get equivalent cspc for spc %s", spcName)
}
Expand All @@ -205,21 +210,17 @@ func (c *CSPCMigrator) getSPCWithMigrationStatus(spcName, openebsNamespace strin
}

// csptocspi migrates a CSP to CSPI based on hostname
func (c *CSPCMigrator) csptocspi(
cspiObj *cstor.CStorPoolInstance,
cspcObj *cstor.CStorPoolCluster,
openebsNamespace string,
) error {
hostnameLabel := string(apis.HostNameCPK) + "=" + cspiObj.Labels[string(apis.HostNameCPK)]
spcLabel := string(apis.StoragePoolClaimCPK) + "=" + cspcObj.Name
func (c *CSPCMigrator) csptocspi(cspiObj *cstor.CStorPoolInstance) error {
hostnameLabel := types.HostNameLabelKey + "=" + cspiObj.Labels[types.HostNameLabelKey]
spcLabel := string(apis.StoragePoolClaimCPK) + "=" + c.CSPCObj.Name
cspLabel := hostnameLabel + "," + spcLabel
var err1 error
cspObj, err := getCSP(cspLabel)
if err != nil {
return err
}
klog.Infof("Migrating csp %s to cspi %s", cspiObj.Name, cspObj.Name)
err = c.scaleDownDeployment(cspObj, openebsNamespace)
err = c.scaleDownDeployment(cspObj, c.OpenebsNamespace)
if err != nil {
return err
}
Expand All @@ -228,7 +229,7 @@ func (c *CSPCMigrator) csptocspi(
cspiObj.Annotations[types.OldPoolName] = "cstor-" + string(cspObj.UID)
delete(cspiObj.Annotations, types.OpenEBSDisableReconcileLabelKey)
cspiObj, err = c.OpenebsClientset.CstorV1().
CStorPoolInstances(openebsNamespace).
CStorPoolInstances(c.OpenebsNamespace).
Update(cspiObj)
if err != nil {
return err
Expand All @@ -238,7 +239,7 @@ func (c *CSPCMigrator) csptocspi(
Wait(5 * time.Second).
Try(func(attempt uint) error {
cspiObj, err1 = c.OpenebsClientset.CstorV1().
CStorPoolInstances(openebsNamespace).
CStorPoolInstances(c.OpenebsNamespace).
Get(cspiObj.Name, metav1.GetOptions{})
if err1 != nil {
return err1
Expand All @@ -252,7 +253,7 @@ func (c *CSPCMigrator) csptocspi(
if err != nil {
return err
}
err = c.updateCVRsLabels(cspObj.Name, openebsNamespace, cspiObj)
err = c.updateCVRsLabels(cspiObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -340,8 +341,8 @@ func (c *CSPCMigrator) scaleDownDeployment(cspObj *apis.CStorPool, openebsNamesp

// Update the bdc with the cspc labels instead of spc labels to allow
// filtering of bds claimed by the migrated cspc.
func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error {
bdcList, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace).List(metav1.ListOptions{
func (c *CSPCMigrator) updateBDCLabels(cspcName string) error {
bdcList, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace).List(metav1.ListOptions{
LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + cspcName,
})
if err != nil {
Expand All @@ -359,7 +360,7 @@ func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error
bdcObj.Finalizers[i] = cspcFinalizer
}
}
_, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace).
_, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace).
Update(bdcObj)
if err != nil {
return errors.Wrapf(err, "failed to update bdc %s with cspc label & finalizer", bdcObj.Name)
Expand All @@ -371,11 +372,11 @@ func (c *CSPCMigrator) updateBDCLabels(cspcName, openebsNamespace string) error

// Update the bdc with the cspc OwnerReferences instead of spc OwnerReferences
// to allow clean up of bdcs on deletion of cspc.
func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openebsNamespace string) error {
func (c *CSPCMigrator) updateBDCOwnerRef() error {
bdcList, err := c.OpenebsClientset.OpenebsV1alpha1().
BlockDeviceClaims(openebsNamespace).
BlockDeviceClaims(c.OpenebsNamespace).
List(metav1.ListOptions{
LabelSelector: types.CStorPoolClusterLabelKey + "=" + cspcObj.Name,
LabelSelector: types.CStorPoolClusterLabelKey + "=" + c.CSPCObj.Name,
})
if err != nil {
return err
Expand All @@ -386,9 +387,9 @@ func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openeb
bdcObj := &bdcItem
klog.Infof("Updating bdc %s with cspc ownerRef.", bdcObj.Name)
bdcObj.OwnerReferences[0].Kind = "CStorPoolCluster"
bdcObj.OwnerReferences[0].UID = cspcObj.UID
bdcObj.OwnerReferences[0].UID = c.CSPCObj.UID
bdcObj.OwnerReferences[0].APIVersion = "cstor.openebs.io/v1"
_, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(openebsNamespace).
_, err := c.OpenebsClientset.OpenebsV1alpha1().BlockDeviceClaims(c.OpenebsNamespace).
Update(bdcObj)
if err != nil {
return errors.Wrapf(err, "failed to update bdc %s with cspc onwerRef", bdcObj.Name)
Expand All @@ -400,10 +401,10 @@ func (c *CSPCMigrator) updateBDCOwnerRef(cspcObj *cstor.CStorPoolCluster, openeb

// Update the cvrs on the old csp with the migrated cspi labels and annotations
// to allow backward compatibility with old external provisioned volumes.
func (c *CSPCMigrator) updateCVRsLabels(cspName, openebsNamespace string, cspiObj *cstor.CStorPoolInstance) error {
func (c *CSPCMigrator) updateCVRsLabels(cspiObj *cstor.CStorPoolInstance) error {
cvrList, err := cvr.NewKubeclient().
WithNamespace(openebsNamespace).List(metav1.ListOptions{
LabelSelector: cspNameLabel + "=" + cspName,
WithNamespace(c.OpenebsNamespace).List(metav1.ListOptions{
LabelSelector: cspNameLabel + "=" + c.CSPCObj.Name,
})
if err != nil {
return err
Expand All @@ -419,7 +420,7 @@ func (c *CSPCMigrator) updateCVRsLabels(cspName, openebsNamespace string, cspiOb
cvrObj.Labels[cspiUIDLabel] = string(cspiObj.UID)
delete(cvrObj.Annotations, cspHostnameAnnotation)
cvrObj.Annotations[cspiHostnameAnnotation] = cspiObj.Spec.HostName
_, err = cvr.NewKubeclient().WithNamespace(openebsNamespace).
_, err = cvr.NewKubeclient().WithNamespace(c.OpenebsNamespace).
Update(cvrObj)
if err != nil {
return errors.Wrapf(err, "failed to update cvr %s with cspc info", cvrObj.Name)
Expand Down

0 comments on commit 9ce6fc0

Please sign in to comment.