Skip to content

Commit

Permalink
fix multiple restore of incremental backup
Browse files Browse the repository at this point in the history
Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 committed Aug 24, 2020
1 parent b3b1ed7 commit c9248f8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 61 deletions.
32 changes: 31 additions & 1 deletion pkg/zfs/plugin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"strconv"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"github.com/openebs/velero-plugin/pkg/zfs/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
Expand All @@ -33,8 +34,17 @@ const (
VeleroBkpKey = "velero.io/backup"
VeleroSchdKey = "velero.io/schedule-name"
VeleroVolKey = "velero.io/volname"
VeleroNsKey = "velero.io/namespace"
)


func (p *Plugin) getPV(volumeID string) (*v1.PersistentVolume, error) {
return p.K8sClient.
CoreV1().
PersistentVolumes().
Get(volumeID, metav1.GetOptions{})
}

func (p *Plugin) uploadZFSVolume(vol *apis.ZFSVolume, bkpname string) error {
data, err := json.MarshalIndent(vol, "", "\t")
if err != nil {
Expand Down Expand Up @@ -167,13 +177,33 @@ func (p *Plugin) checkBackupStatus(bkpname string) {

func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (string, error) {

pv, err := p.getPV(volumeID)
if err != nil {
p.Log.Errorf("zfs: Failed to get pv %s snap %s schd %s err %v", volumeID, snapname, schdname, err)
return "", err
}

if pv.Spec.PersistentVolumeSource.CSI == nil {
return "", errors.New("zfs: err not a CSI pv")
}

volHandle := pv.Spec.PersistentVolumeSource.CSI.VolumeHandle

getOptions := metav1.GetOptions{}
vol, err := volbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(volumeID, getOptions)
WithNamespace(p.namespace).Get(volHandle, getOptions)
if err != nil {
return "", err
}

if pv.Spec.ClaimRef != nil {
// add source namespace in the label to filter it at restore time
if vol.Labels == nil {
vol.Labels = map[string]string{}
}
vol.Labels[VeleroNsKey] = pv.Spec.ClaimRef.Namespace
}

err = p.uploadZFSVolume(vol, snapname)
if err != nil {
return "", err
Expand Down
136 changes: 76 additions & 60 deletions pkg/zfs/plugin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"encoding/json"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openebs/zfs-localpv/pkg/zfs"
"github.com/openebs/velero-plugin/pkg/velero"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"github.com/openebs/velero-plugin/pkg/zfs/utils"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
Expand All @@ -33,29 +33,55 @@ const (
restoreStatusInterval = 5
)

func (p *Plugin) getPV(volumeID string) (*v1.PersistentVolume, error) {
return p.K8sClient.
CoreV1().
PersistentVolumes().
Get(volumeID, metav1.GetOptions{})
}
func (p *Plugin) createVolume(pvname string, bkpname string, bkpZV *apis.ZFSVolume) (*apis.ZFSVolume, bool, error) {

// get the target namespace
ns, err := velero.GetRestoreNamespace(bkpZV.Labels[VeleroNsKey], bkpname, p.Log)

if err != nil {
p.Log.Errorf("zfs: failed to get target ns pv %s bkpname %s err: %v", pvname, bkpname, err)
return nil, false, err
}

filter := metav1.ListOptions{
LabelSelector: VeleroVolKey + "=" + pvname + "," + VeleroNsKey + "=" + ns,
}
volList, err := volbuilder.NewKubeclient().WithNamespace(p.namespace).List(filter)

if err != nil {
p.Log.Errorf("zfs: failed to get source volume failed vol %s snap %s err: %v", pvname, bkpname, err)
return nil, false, err
}

var vol *apis.ZFSVolume = nil

func (p *Plugin) createVolume(volumeID string, schdname string, vol *apis.ZFSVolume, bkpZV *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if len (volList.Items) > 1 {
return nil, false, errors.Errorf("zfs: error can not have more than one source volume %s", pvname, bkpname)
} else if len (volList.Items) == 1 {
vol = &volList.Items[0]
if !p.incremental ||
bkpname == vol.Annotations[VeleroBkpKey] {
// volume has already been restored
return vol, false, nil
}

p.Log.Infof("zfs: got existing volume %s for restore vol %s snap %s", vol.Name, pvname, bkpname)
}

if vol == nil {
// this is first full restore, go ahead and create the volume
rZV := &apis.ZFSVolume{}
// generate a new uuid only if PV exist
pv, err := p.getPV(volumeID)
// hack(https://github.com/vmware-tanzu/velero/pull/2835): generate a new uuid only if PV exist
pv, err := p.getPV(pvname)

if err == nil && pv != nil {
rvol, err := utils.GetRestorePVName()
if err != nil {
return nil, errors.Errorf("zfs: failed to get restore vol name for %s", volumeID)
return nil, false, errors.Errorf("zfs: failed to get restore vol name for %s", pvname)
}
rZV.Name = rvol
} else {
rZV.Name = volumeID
rZV.Name = pvname
}

rZV.Spec = bkpZV.Spec
Expand All @@ -67,71 +93,49 @@ func (p *Plugin) createVolume(volumeID string, schdname string, vol *apis.ZFSVol
rZV.Status.State = zfs.ZFSStatusPending

// add original volume and schedule name in the label
rZV.Labels = map[string]string{VeleroVolKey : volumeID, VeleroSchdKey : schdname}
rZV.Labels = map[string]string{VeleroVolKey : pvname, VeleroNsKey : ns}
rZV.Annotations = map[string]string{VeleroBkpKey : bkpname}

vol, err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Create(rZV)
if err != nil {
p.Log.Errorf("zfs: create ZFSVolume failed vol %v err: %v", rZV, err)
return nil, err
return nil, false, err
}

err = p.checkVolCreation(rZV.Name)
if err != nil {
p.Log.Errorf("zfs: checkVolCreation failed %s err: %v", rZV.Name, err)
return nil, err
return nil, false, err
}
} else {
// this is incremental restore, update the ZFS volume
vol.Spec = bkpZV.Spec
vol, err := volbuilder.NewKubeclient().WithNamespace(p.namespace).Update(vol)
if err != nil {
p.Log.Errorf("zfs: update ZFSVolume failed vol %v err: %v", vol, err)
return nil, err
return nil, false, err
}
}

return vol, nil
return vol, true, nil
}

func (p *Plugin) restoreZFSVolume(volumeID, snapName string) (*apis.ZFSVolume, error) {
func (p *Plugin) restoreZFSVolume(pvname, bkpname string) (*apis.ZFSVolume, bool, error) {

bkpZV := &apis.ZFSVolume{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)
filename := p.cl.GenerateRemoteFilename(pvname, bkpname)

data, ok := p.cl.Read(filename + ".zfsvol")
if !ok {
return nil, errors.Errorf("zfs: failed to download ZFSVolume file=%s", filename+".zfsvol")
return nil, false, errors.Errorf("zfs: failed to download ZFSVolume file=%s", filename+".zfsvol")
}

if err := json.Unmarshal(data, bkpZV); err != nil {
return nil, errors.Errorf("zfs: failed to decode zfsvolume file=%s", filename+".zfsvol")
return nil, false, errors.Errorf("zfs: failed to decode zfsvolume file=%s", filename+".zfsvol")
}

var vol *apis.ZFSVolume = nil

schdname := utils.GetScheduleName(snapName)
// it is incremental restore, find the restore volume
if p.incremental && len(schdname) > 0 {
filter := metav1.ListOptions{
LabelSelector: VeleroVolKey + "=" + volumeID + "," + VeleroSchdKey + "=" + schdname,
}
volList, err := volbuilder.NewKubeclient().WithNamespace(p.namespace).List(filter)

if err != nil {
p.Log.Errorf("zfs: failed to get source volume failed vol %s snap %s err: %v", volumeID, snapName, err)
return nil, err
}

if len (volList.Items) > 1 {
return nil, errors.Errorf("zfs: error can not have more than one source volume %s", volumeID, snapName)
} else if len (volList.Items) == 1 {
p.Log.Infof("zfs: returning volume %s for incremental restore vol %s snap %s", volList.Items[0].Name, volumeID, snapName)
vol = &volList.Items[0]
}
}

return p.createVolume(volumeID, schdname, vol, bkpZV)
return p.createVolume(pvname, bkpname, bkpZV)
}

func (p *Plugin) isVolumeReady(volumeID string) (ready bool, err error) {
Expand Down Expand Up @@ -223,11 +227,17 @@ func (p *Plugin) cleanupRestore(oldvol, newvol, rname string) error {
return nil
}

func (p *Plugin) restoreVolume(rname, volname, snapname string) (string, error) {
zv, err := p.restoreZFSVolume(volname, snapname)
// restoreVolume returns restored vol name and a boolean value indication if we need
// to restore the volume. If Volume is already restored, we don't need to restore it.
func (p *Plugin) restoreVolume(rname, volname, bkpname string) (string, bool, error) {
zv, needRestore, err := p.restoreZFSVolume(volname, bkpname)
if err != nil {
p.Log.Errorf("zfs: download ZFSVolume failed vol %s bkp %s", volname, snapname)
return "", err
p.Log.Errorf("zfs: restore ZFSVolume failed vol %s bkp %s err %v", volname, bkpname, err)
return "", false, err
}

if needRestore == false {
return zv.Name, false, nil
}

node := zv.Spec.OwnerNodeID
Expand All @@ -241,42 +251,48 @@ func (p *Plugin) restoreVolume(rname, volname, snapname string) (string, error)
Build()

if err != nil {
return "", err
return "", false, err
}
_, err = restorebuilder.NewKubeclient().WithNamespace(p.namespace).Create(rstr)
return zv.Name, err
return zv.Name, true, err
}

func (p *Plugin) doRestore(bkpname string) (string, error) {
func (p *Plugin) doRestore(snapshotID string) (string, error) {

volname, snapname, err := utils.GetInfoFromSnapshotID(bkpname)
volname, bkpname, err := utils.GetInfoFromSnapshotID(snapshotID)
if err != nil {
return "", err
}

filename := p.cl.GenerateRemoteFilename(volname, snapname)
filename := p.cl.GenerateRemoteFilename(volname, bkpname)
if filename == "" {
return "", errors.Errorf("zfs: Error creating remote file name for restore")
}

newvol, err := p.restoreVolume(bkpname, volname, snapname)
newvol, needRestore, err := p.restoreVolume(snapshotID, volname, bkpname)
if err != nil {
p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, snapname, err)
p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, bkpname, err)
return "", err
}

go p.checkRestoreStatus(bkpname)
if needRestore == false {
// volume has already been restored
p.Log.Infof("zfs: pv already restored vol %s => %s snap %s", volname, newvol, snapshotID)
return newvol, nil
}

go p.checkRestoreStatus(snapshotID)

ret := p.cl.Download(filename)
if !ret {
p.cleanupRestore(volname, newvol, bkpname)
p.cleanupRestore(volname, newvol, snapshotID)
return "", errors.New("zfs: failed to restore snapshot")
}

if err := p.cleanupRestore(volname, newvol, bkpname); err != nil {
if err := p.cleanupRestore(volname, newvol, snapshotID); err != nil {
return "", err
}

p.Log.Infof("zfs: restore done vol %s => %s snap %s", volname, newvol, bkpname)
p.Log.Infof("zfs: restore done vol %s => %s snap %s", volname, newvol, snapshotID)
return newvol, nil
}
5 changes: 5 additions & 0 deletions pkg/zfs/plugin/zfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"github.com/openebs/velero-plugin/pkg/zfs/utils"
"github.com/openebs/velero-plugin/pkg/velero"
cloud "github.com/openebs/velero-plugin/pkg/clouduploader"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (p *Plugin) Init(config map[string]string) error {
return errors.New("error creating k8s client")
}

if err := velero.InitializeClientSet(conf); err != nil {
return errors.Wrapf(err, "failed to initialize velero clientSet")
}

p.K8sClient = clientset

p.cl = &cloud.Conn{Log: p.Log}
Expand Down

0 comments on commit c9248f8

Please sign in to comment.