Skip to content

Commit

Permalink
support restore in different namespace
Browse files Browse the repository at this point in the history
Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 committed Aug 17, 2020
1 parent b263aac commit 043f9f7
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 32 deletions.
90 changes: 61 additions & 29 deletions pkg/zfs/plugin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ const (
restoreStatusInterval = 5
)

func (p *Plugin) downloadZFSVolume(volumeID, snapName string) (*apis.ZFSVolume, error) {
func (p *Plugin) createZFSVolume(volumeID, snapName string) (*apis.ZFSVolume, error) {
zv := &apis.ZFSVolume{}

rvol, err := utils.GetRestorePVName()
if err != nil {
return nil, errors.Errorf("zfs: failed to get restore vol name for %s", volumeID)
}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)

data, ok := p.cl.Read(filename + ".zfsvol")
Expand All @@ -47,10 +52,25 @@ func (p *Plugin) downloadZFSVolume(volumeID, snapName string) (*apis.ZFSVolume,
}

rzv := &apis.ZFSVolume{}
rzv.Name = zv.Name
rzv.Name = rvol
rzv.Spec = zv.Spec
rzv.Status.State = zfs.ZFSStatusPending

// add original volume in the label
rzv.Labels = map[string]string{zfs.ZFSBackupKey : volumeID}

_, err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Create(rzv)
if err != nil {
p.Log.Errorf("zfs: create ZFSVolume failed vol %v err: %v", rzv, err)
return nil, err
}

err = p.checkVolCreation(rzv.Name)
if err != nil {
p.Log.Errorf("zfs: create waitVolume failed %s err: %v", rzv.Name, err)
return nil, err
}

return rzv, nil
}

Expand All @@ -63,66 +83,78 @@ func (p *Plugin) isVolumeReady(volumeID string) (ready bool, err error) {
return false, err
}

ready = false
if vol.Status.State == zfs.ZFSStatusReady {
ready = true
}

return ready, nil
return vol.Status.State == zfs.ZFSStatusReady, nil
}

func (p *Plugin) checkRestoreStatus(snapname string) {
rstrDone := false

for !rstrDone {
for true {
getOptions := metav1.GetOptions{}
bkp, err := restorebuilder.NewKubeclient().
rstr, err := restorebuilder.NewKubeclient().
WithNamespace(p.namespace).Get(snapname, getOptions)

if err != nil {
p.Log.Errorf("zfs: Failed to fetch restore info {%s}", snapname)
p.Log.Errorf("zfs: Failed to fetch restore {%s}", snapname)
p.cl.ExitServer = true
return
}

time.Sleep(restoreStatusInterval * time.Second)

switch bkp.Status {
switch rstr.Status {
case apis.RSTZFSStatusDone, apis.RSTZFSStatusFailed, apis.RSTZFSStatusInvalid:
rstrDone = true
p.cl.ExitServer = true
return
}

time.Sleep(restoreStatusInterval * time.Second)
}
}

func (p *Plugin) restoreVolume(volname, snapname string) error {
zv, err := p.downloadZFSVolume(volname, snapname)
if err != nil {
p.Log.Errorf("zfs: download ZFSVolume failed vol %s bkp %s", volname, snapname)
return err
func (p *Plugin) checkVolCreation(volname string) error {

for true {

getOptions := metav1.GetOptions{}
vol, err := volbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(volname, getOptions)

if err != nil {
p.Log.Errorf("zfs: Failed to fetch volume {%s}", volname)
return err
}

switch vol.Status.State {
case zfs.ZFSStatusReady:
return nil
case zfs.ZFSStatusFailed:
return errors.Errorf("zfs: Error creating remote file name for restore")
}
time.Sleep(restoreStatusInterval * time.Second)
}
return nil
}

_, err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Create(zv)
func (p *Plugin) restoreVolume(volname, snapname string) (string, error) {
zv, err := p.createZFSVolume(volname, snapname)
if err != nil {
p.Log.Errorf("zfs: create ZFSVolume failed vol %v err: %v", zv, err)
return err
p.Log.Errorf("zfs: download ZFSVolume failed vol %s bkp %s", volname, snapname)
return "", err
}

node := zv.Spec.OwnerNodeID

rstr, err := restorebuilder.NewBuilder().
WithName(snapname).
WithVolume(volname).
WithVolume(zv.Name).
WithNode(node).
WithStatus(apis.RSTZFSStatusInit).
WithRemote(p.remoteAddr).
Build()

if err != nil {
return err
return "", err
}
_, err = restorebuilder.NewKubeclient().WithNamespace(p.namespace).Create(rstr)
return err
return zv.Name, err
}

func (p *Plugin) doRestore(snapshotID string) (string, error) {
Expand All @@ -132,7 +164,7 @@ func (p *Plugin) doRestore(snapshotID string) (string, error) {
return "", err
}

err = p.restoreVolume(volname, snapname)
newvol, err := p.restoreVolume(volname, snapname)
if err != nil {
p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, snapname, err)
return "", err
Expand Down Expand Up @@ -170,5 +202,5 @@ func (p *Plugin) doRestore(snapshotID string) (string, error) {
}

p.Log.Infof("zfs: restore done vol %s snap %s", volname, snapshotID)
return volname, nil
return newvol, nil
}
6 changes: 3 additions & 3 deletions pkg/zfs/plugin/zfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
// ZFSPV_NAMESPACE config key for OpenEBS namespace
ZFSPV_NAMESPACE = "namespace"
backupStatusInterval = 5
LocalSnapshot = "local"
)

// Plugin is a plugin for containing state for the blockstore
Expand Down Expand Up @@ -156,12 +155,13 @@ func (p *Plugin) SetVolumeID(unstructuredPV runtime.Unstructured, volumeID strin
return nil, errors.WithStack(err)
}

pv.Name = volumeID
pv.Spec.PersistentVolumeSource.CSI.VolumeHandle = volumeID

res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pv)
if err != nil {
return nil, errors.WithStack(err)
}

pv.Name = volumeID

return &unstructured.Unstructured{Object: res}, nil
}
13 changes: 13 additions & 0 deletions pkg/zfs/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"strconv"
"strings"
"github.com/pkg/errors"
"github.com/gofrs/uuid"
cloud "github.com/openebs/velero-plugin/pkg/clouduploader"
)

const (
// IdentifierKey is a word to generate snapshotID from volume name and backup name
IdentifierKey = "@"
// restored PV prefix name
RestorePrefix = "restored-"
)

func GetServerAddress() (string, error) {
Expand Down Expand Up @@ -66,3 +69,13 @@ func GetInfoFromSnapshotID(snapshotID string) (volumeID, backupName string, err
}
return
}

// GetRestorePVName return new name for clone pv for the given pv
func GetRestorePVName() (string, error) {
nuuid, err := uuid.NewV4()
if err != nil {
return "", errors.Wrapf(err, "zfs: error generating uuid for PV rename")
}

return RestorePrefix + nuuid.String(), nil
}

0 comments on commit 043f9f7

Please sign in to comment.