Skip to content

Commit

Permalink
add cvr check and improve job restarts
Browse files Browse the repository at this point in the history
Signed-off-by: shubham <[email protected]>
  • Loading branch information
shubham14bajpai committed Jun 10, 2020
1 parent f2f82a4 commit 5f5c9e3
Showing 1 changed file with 138 additions and 26 deletions.
164 changes: 138 additions & 26 deletions pkg/migrate/cstor/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

goversion "github.com/hashicorp/go-version"
cstor "github.com/openebs/api/pkg/apis/cstor/v1"
"github.com/openebs/api/pkg/apis/types"
openebsclientset "github.com/openebs/api/pkg/client/clientset/versioned"
apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
cv "github.com/openebs/maya/pkg/cstor/volume/v1alpha1"
Expand All @@ -34,8 +35,9 @@ import (
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -82,7 +84,48 @@ func (v *VolumeMigrator) Migrate(pvName, openebsNamespace string) error {
if err != nil {
return err
}
return v.migrate()
shouldMigrate, err := v.isMigrationRequired()
if err != nil {
return err
}
if shouldMigrate {
err = v.migrate()
if err != nil {
return err
}
} else {
klog.Infof("Volume %s already migrated to csi spec", pvName)
}
return v.deleteTempPolicy()
}

func (v *VolumeMigrator) isMigrationRequired() (bool, error) {
cvList, err := cv.NewKubeclient().WithNamespace("").
List(metav1.ListOptions{
LabelSelector: "openebs.io/persistent-volume=" + v.PVName,
})
if err != nil && !k8serrors.IsNotFound(err) {
return false, err
}
if err == nil && len(cvList.Items) != 0 {
return true, nil
}
_, err = v.OpenebsClientset.CstorV1().
CStorVolumes(v.OpenebsNamespace).Get(v.PVName, metav1.GetOptions{})
if err == nil {
return false, nil
}
return false, err
}

func (v *VolumeMigrator) deleteTempPolicy() error {
err := v.OpenebsClientset.CstorV1().
CStorVolumePolicies(v.OpenebsNamespace).
Delete(v.PVName, &metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
return nil
}

func (v *VolumeMigrator) validateCVCOperator() error {
Expand Down Expand Up @@ -185,7 +228,6 @@ func (v *VolumeMigrator) migrate() error {
}

func (v *VolumeMigrator) migratePVC(pvObj *corev1.PersistentVolume) (*corev1.PersistentVolumeClaim, error) {
klog.Infof("Generating equivalent CSI PVC")
pvcObj, recreateRequired, err := v.generateCSIPVC(pvObj.Name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -219,15 +261,14 @@ func (v *VolumeMigrator) addSkipAnnotationToPVC(pvcObj *corev1.PersistentVolumeC
}
_, err = v.KubeClientset.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).Patch(
oldPVC.Name,
types.StrategicMergePatchType,
k8stypes.StrategicMergePatchType,
data,
)
}
return err
}

func (v *VolumeMigrator) migratePV(pvcObj *corev1.PersistentVolumeClaim) (*corev1.PersistentVolume, error) {
klog.Infof("Generating equivalent CSI PV")
pvObj, recreateRequired, err := v.generateCSIPV(pvcObj.Spec.VolumeName, pvcObj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -259,6 +300,7 @@ func (v *VolumeMigrator) generateCSIPVC(pvName string) (*corev1.PersistentVolume
}
}
if pvcObj.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != cstorCSIDriver {
klog.Infof("Generating equivalent CSI PVC %s", pvcName)
csiPVC := &corev1.PersistentVolumeClaim{}
csiPVC.Name = pvcName
csiPVC.Namespace = pvcNamespace
Expand Down Expand Up @@ -297,6 +339,7 @@ func (v *VolumeMigrator) generateCSIPV(
}
}
if pvObj.Spec.PersistentVolumeSource.CSI == nil {
klog.Infof("Generating equivalent CSI PV %s", v.PVName)
csiPV := &corev1.PersistentVolume{}
csiPV.Name = pvObj.Name
csiPV.Annotations = map[string]string{
Expand Down Expand Up @@ -337,6 +380,7 @@ func (v *VolumeMigrator) generateCSIPVFromCV(
cvName string,
pvcObj *corev1.PersistentVolumeClaim,
) (*corev1.PersistentVolume, error) {
klog.Infof("Generating equivalent CSI PV %s", v.PVName)
cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace).
Get(cvName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -391,6 +435,16 @@ func (v *VolumeMigrator) createCVC(pvName string) error {
if err != nil {
return err
}
cvrList, err := cvr.NewKubeclient().WithNamespace(v.OpenebsNamespace).
List(metav1.ListOptions{
LabelSelector: "",
})
if err != nil {
return err
}
if len(cvrList.Items) == 0 {
return errors.Errorf("failed to get cvrs for volume %s", v.PVName)
}
annotations := map[string]string{
"openebs.io/volumeID": pvName,
"openebs.io/volume-policy": pvName,
Expand All @@ -412,7 +466,7 @@ func (v *VolumeMigrator) createCVC(pvName string) error {
corev1.ResourceName(corev1.ResourceStorage): cvObj.Spec.Capacity,
}
cvcObj.Spec.Provision.Capacity = corev1.ResourceList{
corev1.ResourceName(corev1.ResourceStorage): cvObj.Spec.Capacity,
corev1.ResourceName(corev1.ResourceStorage): resource.MustParse(cvrList.Items[0].Spec.Capacity),
}
cvcObj.Spec.Provision.ReplicaCount = cvObj.Spec.ReplicationFactor
cvcObj.Status.Phase = cstor.CStorVolumeConfigPhasePending
Expand Down Expand Up @@ -460,7 +514,7 @@ func (v *VolumeMigrator) patchTargetSVCOwnerRef() error {
}
_, err = v.KubeClientset.CoreV1().
Services(v.OpenebsNamespace).
Patch(v.PVName, types.StrategicMergePatchType, data)
Patch(v.PVName, k8stypes.StrategicMergePatchType, data)
return err
}

Expand Down Expand Up @@ -519,12 +573,10 @@ func (v *VolumeMigrator) updateStorageClass(pvName, scName string) error {
v.StorageClass = scObj

}
if tmpSCObj != nil {
err = v.KubeClientset.StorageV1().
StorageClasses().Delete(tmpSCObj.Name, &metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, "failed to delete temporary storageclass")
}
err = v.KubeClientset.StorageV1().
StorageClasses().Delete("tmp-migrate-"+scObj.Name, &metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete temporary storageclass")
}
return nil
}
Expand Down Expand Up @@ -575,6 +627,9 @@ func (v *VolumeMigrator) populateCVNamespace(cvName string) error {
if err != nil {
return err
}
if len(cvList.Items) != 1 {
return errors.Errorf("expected exactly 1 cv for %s, got %d", v.PVName, len(cvList.Items))
}
for _, cvObj := range cvList.Items {
if cvObj.Name == cvName {
v.CVNamespace = cvObj.Namespace
Expand Down Expand Up @@ -633,12 +688,20 @@ func (v *VolumeMigrator) validatePVName() (*corev1.PersistentVolumeClaim, bool,
}

func (v *VolumeMigrator) removeOldTarget() error {
err := v.KubeClientset.AppsV1().
Deployments(v.CVNamespace).
Delete(v.PVName+"-target", &metav1.DeleteOptions{})
_, err := v.OpenebsClientset.CstorV1().
CStorVolumeConfigs(v.OpenebsNamespace).
Get(v.PVName, metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
if k8serrors.IsNotFound(err) {
err = v.KubeClientset.AppsV1().
Deployments(v.CVNamespace).
Delete(v.PVName+"-target", &metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
// if cv namespace and openebs namespace is not same
// migrate the target service to openebs namespace
if v.CVNamespace != v.OpenebsNamespace {
Expand Down Expand Up @@ -740,7 +803,7 @@ func (v *VolumeMigrator) getTargetSVC() (*corev1.Service, error) {
}

func (v *VolumeMigrator) createTempPolicy() error {
klog.Infof("Creating temporary policy %s for migration", v.PVName)
klog.Infof("Checking for a temporary policy of volume %s", v.PVName)
_, err := v.OpenebsClientset.CstorV1().
CStorVolumePolicies(v.OpenebsNamespace).
Get(v.PVName, metav1.GetOptions{})
Expand All @@ -750,12 +813,18 @@ func (v *VolumeMigrator) createTempPolicy() error {
if !k8serrors.IsNotFound(err) {
return err
}
klog.Infof("Creating temporary policy %s for migration", v.PVName)
targetDeploy, err := v.KubeClientset.AppsV1().
Deployments(v.CVNamespace).
Get(v.PVName+"-target", metav1.GetOptions{})
if err != nil {
return err
}
cvObj, err := cv.NewKubeclient().WithNamespace(v.CVNamespace).
Get(v.PVName, metav1.GetOptions{})
if err != nil {
return err
}
replicas, err := cvr.NewKubeclient().
WithNamespace(v.OpenebsNamespace).
List(metav1.ListOptions{
Expand All @@ -782,6 +851,11 @@ func (v *VolumeMigrator) createTempPolicy() error {
if targetDeploy.Spec.Template.Spec.NodeSelector != nil {
tempPolicy.Spec.Target.NodeSelector = targetDeploy.Spec.Template.Spec.NodeSelector
}
if len(replicas.Items) != cvObj.Spec.ReplicationFactor {
return errors.Errorf("failed to get cvrs for volume %s, expected %d got %d",
v.PVName, cvObj.Spec.ReplicationFactor, len(replicas.Items),
)
}
for _, replica := range replicas.Items {
tempPolicy.Spec.ReplicaPoolInfo = append(tempPolicy.Spec.ReplicaPoolInfo,
cstor.ReplicaPoolInfo{
Expand All @@ -797,6 +871,50 @@ func (v *VolumeMigrator) createTempPolicy() error {

func (v *VolumeMigrator) validateMigratedVolume() error {
klog.Info("Validating the migrated volume")
retry:
cvcObj, err := v.OpenebsClientset.CstorV1().
CStorVolumeConfigs(v.OpenebsNamespace).
Get(v.PVName, metav1.GetOptions{})
if err != nil {
return err
}
if cvcObj.Status.Phase != cstor.CStorVolumeConfigPhaseBound {
klog.Infof("Waiting for cvc %s to become Bound, got: %s", v.PVName, cvcObj.Status.Phase)
time.Sleep(10 * time.Second)
goto retry
}
policy, err := v.OpenebsClientset.CstorV1().
CStorVolumePolicies(v.OpenebsNamespace).
Get(v.PVName, metav1.GetOptions{})
if err != nil {
return err
}
cvrList, err := v.OpenebsClientset.CstorV1().
CStorVolumeReplicas(v.OpenebsNamespace).
List(metav1.ListOptions{
LabelSelector: "openebs.io/persistent-volume=" + v.PVName,
})
if err != nil {
return err
}
for _, info := range policy.Spec.ReplicaPoolInfo {
found := false
cvrPools := []string{}
for _, replica := range cvrList.Items {
if info.PoolName == replica.Labels[types.CStorPoolInstanceNameLabelKey] {
found = true
}
if len(cvrPools) != len(cvrList.Items) {
cvrPools = append(cvrPools, replica.Labels[types.CStorPoolInstanceLabelKey])
}
}
if !found {
return errors.Errorf("cvr expected to be scheduled %s pool, but cvrs scheduled on pools %v",
info.PoolName,
cvrPools,
)
}
}
for {
cvObj, err1 := v.OpenebsClientset.CstorV1().
CStorVolumes(v.OpenebsNamespace).
Expand All @@ -813,7 +931,7 @@ func (v *VolumeMigrator) validateMigratedVolume() error {
time.Sleep(10 * time.Second)
}
klog.Info("Patching the target svc with cvc owner ref")
err := v.patchTargetSVCOwnerRef()
err = v.patchTargetSVCOwnerRef()
if err != nil {
errors.Wrap(err, "failed to patch cvc owner ref to target svc")
}
Expand Down Expand Up @@ -848,7 +966,7 @@ func (v *VolumeMigrator) patchTargetPodAffinity() error {
}
_, err = v.KubeClientset.AppsV1().
Deployments(v.OpenebsNamespace).
Patch(v.PVName+"-target", types.StrategicMergePatchType, data)
Patch(v.PVName+"-target", k8stypes.StrategicMergePatchType, data)
return err
}

Expand Down Expand Up @@ -878,12 +996,6 @@ func (v *VolumeMigrator) cleanupOldResources() error {
return err
}
}
err = v.OpenebsClientset.CstorV1().
CStorVolumePolicies(v.OpenebsNamespace).
Delete(v.PVName, &metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
cvcObj, err := v.OpenebsClientset.CstorV1().
CStorVolumeConfigs(v.OpenebsNamespace).
Get(v.PVName, metav1.GetOptions{})
Expand All @@ -898,7 +1010,7 @@ func (v *VolumeMigrator) cleanupOldResources() error {
}
_, err = v.OpenebsClientset.CstorV1().
CStorVolumeConfigs(v.OpenebsNamespace).
Patch(v.PVName, types.MergePatchType, data)
Patch(v.PVName, k8stypes.MergePatchType, data)
if err != nil {
return err
}
Expand Down

0 comments on commit 5f5c9e3

Please sign in to comment.