Skip to content

Commit

Permalink
Enabling remote restore in different namespace
Browse files Browse the repository at this point in the history
Signed-off-by: mayank <[email protected]>
  • Loading branch information
mayank committed Apr 24, 2020
1 parent 837917c commit bed99ce
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 94 deletions.
95 changes: 44 additions & 51 deletions pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
*/
v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
openebs "github.com/openebs/maya/pkg/client/generated/clientset/versioned"
velero "github.com/openebs/velero-plugin/pkg/velero"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (

// LocalSnapshot config key for local snapshot
LocalSnapshot = "local"

// SnapshotIDIdentifier is a word to generate snapshotID from volume name and backup name
SnapshotIDIdentifier = "-velero-bkp-"
)

// Plugin defines snapshot plugin for CStor volume
Expand Down Expand Up @@ -209,6 +213,10 @@ func (p *Plugin) Init(config map[string]string) error {
return nil
}

if err := velero.InitializeClientSet(conf); err != nil {
return errors.Wrapf(err, "failed to initialize velero clientSet")
}

p.cl = &cloud.Conn{Log: p.Log}
return p.cl.Init(config)
}
Expand Down Expand Up @@ -349,7 +357,7 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin

if p.local {
// local snapshot
return volumeID + "-velero-bkp-" + bkpname, nil
return generateSnapshotID(volumeID, bkpname), nil
}

filename := p.cl.GenerateRemoteFilename(vol.snapshotTag, vol.backupName)
Expand All @@ -365,15 +373,14 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin
}

if vol.backupStatus == v1alpha1.BKPCStorStatusDone {
return volumeID + "-velero-bkp-" + bkpname, nil
return generateSnapshotID(volumeID, bkpname), nil
}

return "", errors.Errorf("Failed to upload snapshot, status:{%v}", vol.backupStatus)
}

func (p *Plugin) getSnapInfo(snapshotID string) (*Snapshot, error) {
s := strings.Split(snapshotID, "-velero-bkp-")
volumeID := s[0]
bkpName := s[1]
volumeID, bkpName := getInfoFromSnapshotID(snapshotID)

pv, err := p.K8sClient.
CoreV1().
Expand All @@ -397,32 +404,39 @@ func (p *Plugin) getSnapInfo(snapshotID string) (*Snapshot, error) {
// CreateVolumeFromSnapshot create CStor volume for given
// snapshotID and perform restore operation on it
func (p *Plugin) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) {
var (
newVol *Volume
err error
)

if volumeType != "cstor-snapshot" {
return "", errors.Errorf("Invalid volume type{%s}", volumeType)
}

s := strings.Split(snapshotID, "-velero-bkp-")
volumeID := s[0]
snapName := s[1]
volumeID, snapName := getInfoFromSnapshotID(snapshotID)

snapType := "cloud"
if p.local {
snapType = "local"
}

p.Log.Infof("Restoring %s snapshot{%s} for volume:%s", snapType, snapName, volumeID)
p.Log.Infof("Restoring %s snapshot{%s} for volume:%s localBackup:%s", snapType, snapName, volumeID, p.local)

newVol, err := p.getVolInfo(volumeID, snapName)
if err != nil {
return "", errors.Wrapf(err, "Failed to read PVC for volumeID=%s snap=%s", volumeID, snapName)
}

fn := p.restoreVolumeFromCloud
if p.local {
fn = p.restoreVolumeFromLocal
}
newVol, err = p.getVolumeForLocalRestore(volumeID, snapName)
if err != nil {
return "", errors.Wrapf(err, "Failed to read PVC for volumeID=%s snap=%s", volumeID, snapName)
}

err = fn(newVol)
err = p.restoreVolumeFromLocal(newVol)
} else {
newVol, err = p.getVolumeForRemoteRestore(volumeID, snapName)
if err != nil {
return "", errors.Wrapf(err, "Failed to read PVC for volumeID=%s snap=%s", volumeID, snapName)
}

err = p.restoreVolumeFromCloud(newVol)
}

if err != nil {
p.Log.Errorf("Failed to restore volume : %s", err)
Expand Down Expand Up @@ -473,39 +487,6 @@ func (p *Plugin) SetVolumeID(unstructuredPV runtime.Unstructured, volumeID strin
return &unstructured.Unstructured{Object: res}, nil
}

func (p *Plugin) getVolInfo(volumeID, snapName string) (*Volume, error) {
// To support namespace re-mapping for cloud snapshot,
// change createPVC to getPVCInfo
fn := p.createPVC
if p.local {
fn = p.getPVInfo
}

vol, err := fn(volumeID, snapName)
if err != nil {
return nil, err
}

// To support namespace re-mapping for cloud-snapshot remove below check
if p.local {
// Let's rename PV if already created
// -- PV may exist in-case of namespace-remapping or stale PV
newVolName, err := p.generateRestorePVName(vol.volname)
if err != nil {
return nil, errors.Wrapf(err, "Failed to generate PV name")
}

delete(p.volumes, vol.volname)
vol.volname = newVolName
}

p.volumes[vol.volname] = vol

p.Log.Infof("Generated PV name is %s", vol.volname)

return vol, nil
}

// getScheduleName return the schedule name for the given backup
// It will check if backup name have 'bkp-20060102150405' format
func (p *Plugin) getScheduleName(backupName string) string {
Expand All @@ -524,3 +505,15 @@ func (p *Plugin) getScheduleName(backupName string) string {
}
return scheduleOrBackupName
}

// getInfoFromSnapshotID return backup name and volume id from the given snapshotID
func getInfoFromSnapshotID(snapshotID string) (volumeID, backupName string) {
s := strings.Split(snapshotID, SnapshotIDIdentifier)
volumeID = s[0]
backupName = s[1]
return
}

func generateSnapshotID(volumeID, backupName string) string {
return volumeID + SnapshotIDIdentifier + backupName
}
75 changes: 48 additions & 27 deletions pkg/cstor/pv_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PvClonePrefix prefix for clone volume in case restore from local backup
PvClonePrefix = "cstor-clone-"
)

func (p *Plugin) updateVolCASInfo(data []byte, volumeID string) error {
var cas v1alpha1.CASVolume

Expand Down Expand Up @@ -60,40 +64,40 @@ func (p *Plugin) restoreVolumeFromCloud(vol *Volume) error {
return nil
}

func (p *Plugin) generateRestorePVName(volumeID string) (string, error) {
_, err := p.K8sClient.
func (p *Plugin) getPV(volumeID string) (*v1.PersistentVolume, error) {
return p.K8sClient.
CoreV1().
PersistentVolumes().
Get(volumeID, metav1.GetOptions{})
}

func (p *Plugin) restoreVolumeFromLocal(vol *Volume) error {
_, err := p.sendRestoreRequest(vol)
if err != nil {
if apierror.IsNotFound(err) {
return volumeID, nil
}
return "", errors.Wrapf(err, "Error checking if PV with same name exist")
return errors.Wrapf(err, "Restore request to apiServer failed")
}
vol.restoreStatus = v1alpha1.RSTCStorStatusDone
return nil
}

nuuid, err := uuid.NewV4()
// getVolumeForLocalRestore return volume information to restore locally for the given volumeID and snapName
// volumeID : pv name from backup
// snapName : snapshot name from where new volume will be created
func (p *Plugin) getVolumeForLocalRestore(volumeID, snapName string) (*Volume, error) {
pv, err := p.getPV(volumeID)
if err != nil {
return "", errors.Wrapf(err, "Error generating uuid for PV rename")
return nil, errors.Wrapf(err, "error fetching PV=%s", volumeID)
}

oldVolumeID, volumeID := volumeID, "cstor-clone-"+nuuid.String()
p.Log.Infof("Renaming PV %s to %s", oldVolumeID, volumeID)
return volumeID, nil
}

func (p *Plugin) getPVInfo(volumeID, snapName string) (*Volume, error) {
pv, err := p.K8sClient.
CoreV1().
PersistentVolumes().
Get(volumeID, metav1.GetOptions{})
clonePvName, err := generateClonePVName()
if err != nil {
return nil, errors.Errorf("Error fetching volume{%s} : %s", volumeID, err.Error())
return nil, err
}
p.Log.Infof("Renaming PV %s to %s", pv.Name, clonePvName)

vol := &Volume{
volname: volumeID,
srcVolname: volumeID,
volname: clonePvName,
srcVolname: pv.Name,
backupName: snapName,
storageClass: pv.Spec.StorageClassName,
size: pv.Spec.Capacity[v1.ResourceStorage],
Expand All @@ -102,11 +106,28 @@ func (p *Plugin) getPVInfo(volumeID, snapName string) (*Volume, error) {
return vol, nil
}

func (p *Plugin) restoreVolumeFromLocal(vol *Volume) error {
_, err := p.sendRestoreRequest(vol)
// getVolumeForRemoteRestore return volume information to restore from remote backup for the given volumeID and snapName
// volumeID : pv name from backup
// snapName : snapshot name from where new volume will be created
func (p *Plugin) getVolumeForRemoteRestore(volumeID, snapName string) (*Volume, error) {
vol, err := p.createPVC(volumeID, snapName)
if err != nil {
return errors.Wrapf(err, "Restore request to apiServer failed")
return nil, err
}
vol.restoreStatus = v1alpha1.RSTCStorStatusDone
return nil

p.volumes[vol.volname] = vol

p.Log.Infof("Generated PV name is %s", vol.volname)

return vol, nil
}

// generateClonePVName return new name for clone pv for the given pv
func generateClonePVName() (string, error) {
nuuid, err := uuid.NewV4()
if err != nil {
return "", errors.Wrapf(err, "Error generating uuid for PV rename")
}

return PvClonePrefix + nuuid.String(), nil
}
44 changes: 28 additions & 16 deletions pkg/cstor/pvc_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"time"

velero "github.com/openebs/velero-plugin/pkg/velero"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -89,35 +90,30 @@ func (p *Plugin) backupPVC(volumeID string) error {

// createPVC create PVC for given volume name
func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
pvc := &v1.PersistentVolumeClaim{}
var vol *Volume
var data []byte
var ok bool

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)
if filename == "" {
return nil, errors.New("error creating remote file name for pvc backup")
pvc, err := p.downloadPVC(volumeID, snapName)
if err != nil {
return nil, errors.Wrapf(err, "failed to download pvc")
}

if data, ok = p.cl.Read(filename + ".pvc"); !ok {
return nil, errors.Errorf("Failed to download PVC file=%s", filename+".pvc")
targetedNs, err := velero.GetRestoreNamespace(pvc.Namespace, snapName, p.Log)
if err != nil {
return nil, err
}
pvc.Namespace = targetedNs

if err := json.Unmarshal(data, pvc); err != nil {
return nil, errors.Errorf("Failed to decode pvc file=%s", filename+".pvc")
newVol, err := p.getVolumeFromPVC(*pvc)
if err != nil {
return nil, err
}

newVol, err := p.getVolumeFromPVC(*pvc)
if newVol != nil {
newVol.backupName = snapName
newVol.snapshotTag = volumeID
return newVol, nil
}

if err != nil {
return nil, err
}

p.Log.Infof("Creating PVC for volumeID:%s snapshot:%s", volumeID, snapName)

pvc.Annotations = make(map[string]string)
Expand Down Expand Up @@ -200,7 +196,6 @@ func (p *Plugin) getPVCInfo(volumeID, snapName string) (*Volume, error) {
return vol, nil
}

// nolint: unused
// getVolumeFromPVC returns volume info for given PVC if PVC is in bound state
func (p *Plugin) getVolumeFromPVC(pvc v1.PersistentVolumeClaim) (*Volume, error) {
rpvc, err := p.K8sClient.
Expand Down Expand Up @@ -231,3 +226,20 @@ func (p *Plugin) getVolumeFromPVC(pvc v1.PersistentVolumeClaim) (*Volume, error)
}
return vol, nil
}

func (p *Plugin) downloadPVC(volumeID, snapName string) (*v1.PersistentVolumeClaim, error) {
pvc := &v1.PersistentVolumeClaim{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)

data, ok := p.cl.Read(filename + ".pvc")
if !ok {
return nil, errors.Errorf("failed to download PVC file=%s", filename+".pvc")
}

if err := json.Unmarshal(data, pvc); err != nil {
return nil, errors.Errorf("failed to decode pvc file=%s", filename+".pvc")
}

return pvc, nil
}
32 changes: 32 additions & 0 deletions pkg/velero/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package velero

import (
"sort"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// GetRestoreNamespace return the namespace mapping for the given namespace
func GetRestoreNamespace(ns, bkpName string, log logrus.FieldLogger) (string, error) {
listOpts := metav1.ListOptions{}
list, err := clientSet.VeleroV1().Restores(veleroNs).List(listOpts)
if err != nil {
return "", errors.Wrapf(err, "failed to get list of restore")
}

sort.Sort(sort.Reverse(RestoreByCreationTimestamp(list.Items)))

for _, r := range list.Items {
if r.Status.Phase == velerov1api.RestorePhaseInProgress && r.Spec.BackupName == bkpName {
targetedNs, ok := r.Spec.NamespaceMapping[ns]
if ok {
return targetedNs, nil
}
return ns, nil
}
}
return "", errors.Errorf("restore not found for backup %s", bkpName)
}
Loading

0 comments on commit bed99ce

Please sign in to comment.