diff --git a/changelogs/116-pawanpraka1 b/changelogs/unreleased/116-pawanpraka1 similarity index 100% rename from changelogs/116-pawanpraka1 rename to changelogs/unreleased/116-pawanpraka1 diff --git a/changelogs/unreleased/117-pawanpraka1 b/changelogs/unreleased/117-pawanpraka1 new file mode 100644 index 00000000..ea7c7bd7 --- /dev/null +++ b/changelogs/unreleased/117-pawanpraka1 @@ -0,0 +1 @@ +wait for velero server to be ready before doing backup/restore diff --git a/go.mod b/go.mod index 2b6468ec..4d8f1934 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/onsi/gomega v1.7.1 github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339 github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0 - github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b + github.com/openebs/zfs-localpv v1.0.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.5.1 // indirect github.com/sirupsen/logrus v1.5.0 diff --git a/go.sum b/go.sum index 2b4bb47e..c74d5e51 100644 --- a/go.sum +++ b/go.sum @@ -472,8 +472,8 @@ github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339 h1:T6IuGc8zikB4XG+s github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339/go.mod h1:TASujm6H1LGdx43MN7Dab1xdAqR7MVU8bsS74Ywop5w= github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0 h1:9o6+N3YkssQvUlmJnqNULSxsGFO/rb1we8MtYKr5ze4= github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0/go.mod h1:QQY9cOHKQwZ73qbv6O//UYUBLNV2S0MRDIfG7t5KOCk= -github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b h1:f7q/JlKx165iBnn8xTdFFvU6D3JGe7iz9Jtvx61UzC8= -github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b/go.mod h1:EvN06mQH8s6oGq4ybAs1DLqy0fVS3CDh7xmvPLBQtpE= +github.com/openebs/zfs-localpv v1.0.0 h1:aBXyYEFN+5NTGiqT6xnqLp1N34U8AUbh9vlhSA8BQnI= +github.com/openebs/zfs-localpv v1.0.0/go.mod h1:EvN06mQH8s6oGq4ybAs1DLqy0fVS3CDh7xmvPLBQtpE= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c/go.mod h1:VyrYX9gd7irzKovcSS6BIIEwPRkP2Wm2m9ufcdFSJ34= diff --git a/pkg/clouduploader/conn.go b/pkg/clouduploader/conn.go index f05e6f0a..2347957f 100644 --- a/pkg/clouduploader/conn.go +++ b/pkg/clouduploader/conn.go @@ -104,6 +104,9 @@ type Conn struct { // exitServer, if server connection needs to be stopped or not ExitServer bool + + // ConnReady describes the connection ready state + ConnReady *chan bool } // setupBucket creates a connection to a particular cloud provider's blob storage. diff --git a/pkg/clouduploader/operation.go b/pkg/clouduploader/operation.go index 8db5e5b2..c83d1aa9 100644 --- a/pkg/clouduploader/operation.go +++ b/pkg/clouduploader/operation.go @@ -133,3 +133,16 @@ func (c *Conn) GenerateRemoteFilename(file, backup string) string { } return c.backupPathPrefix + "/" + backupDir + "/" + backup + "/" + c.prefix + "-" + file + "-" + backup } + +// ConnStateInit initializes the channel and exit server flag +func (c *Conn) ConnStateInit() { + ch := make(chan bool, 1) + c.ConnReady = &ch + c.ExitServer = false +} + +// ConnReadyWait will return when connection is ready to accept the connection +func (c *Conn) ConnReadyWait() bool { + ok := <-*c.ConnReady + return ok +} diff --git a/pkg/clouduploader/server.go b/pkg/clouduploader/server.go index 5f777916..ecd556a8 100644 --- a/pkg/clouduploader/server.go +++ b/pkg/clouduploader/server.go @@ -274,6 +274,11 @@ func (s *Server) Run(opType ServerOperation, port int) error { return err } + if s.cl.ConnReady != nil { + // Connection has started listening on the specified port + *s.cl.ConnReady <- true + } + epfd, err := syscall.EpollCreate1(0) if err != nil { s.Log.Errorf("Failed to create epoll: %s", err.Error()) diff --git a/pkg/zfs/plugin/backup.go b/pkg/zfs/plugin/backup.go index b63dae8c..342d0958 100644 --- a/pkg/zfs/plugin/backup.go +++ b/pkg/zfs/plugin/backup.go @@ -20,6 +20,7 @@ import ( "encoding/json" "sort" "strconv" + "sync" "time" "github.com/openebs/velero-plugin/pkg/zfs/utils" @@ -144,34 +145,44 @@ func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string, po return "", err } - return bkpname, err + return bkpname, nil } -func (p *Plugin) checkBackupStatus(bkpname string) { - bkpDone := false - - for !bkpDone { +func (p *Plugin) checkBackupStatus(bkpname string) error { + for { getOptions := metav1.GetOptions{} bkp, err := bkpbuilder.NewKubeclient(). WithNamespace(p.namespace).Get(bkpname, getOptions) if err != nil { p.Log.Errorf("zfs: Failed to fetch backup info {%s}", bkpname) - p.cl.ExitServer = true - return + return errors.Errorf("zfs: error in getting bkp status err %v", err) } - time.Sleep(backupStatusInterval * time.Second) - switch bkp.Status { - case apis.BKPZFSStatusDone, apis.BKPZFSStatusFailed, apis.BKPZFSStatusInvalid: - bkpDone = true - p.cl.ExitServer = true + case apis.BKPZFSStatusDone: + return nil + case apis.BKPZFSStatusFailed, apis.BKPZFSStatusInvalid: + return errors.Errorf("zfs: error in uploading snapshot, status:{%v}", bkp.Status) } + + time.Sleep(backupStatusInterval * time.Second) + } +} + +func (p *Plugin) doUpload(wg *sync.WaitGroup, filename string, size int64, port int) { + defer wg.Done() + + ok := p.cl.Upload(filename, size, port) + if !ok { + p.Log.Errorf("zfs: Failed to upload file %s", filename) + *p.cl.ConnReady <- false } + // done with the channel, close it + close(*p.cl.ConnReady) } -func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (string, error) { +func (p *Plugin) doBackup(volumeID string, snapname string, schdname string, port int) (string, error) { pv, err := p.getPV(volumeID) if err != nil { p.Log.Errorf("zfs: Failed to get pv %s snap %s schd %s err %v", volumeID, snapname, schdname, err) @@ -201,9 +212,6 @@ func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (st return "", errors.Errorf("zfs: err pv is not claimed") } - // reset the exit server to false - p.cl.ExitServer = false - filename := p.cl.GenerateRemoteFilename(volumeID, snapname) if filename == "" { return "", errors.Errorf("zfs: error creating remote file name for backup") @@ -219,33 +227,45 @@ func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (st return "", errors.Errorf("zfs: error parsing the size %s", vol.Spec.Capacity) } - // TODO(pawan) should wait for upload server to be up - bkpname, err := p.createBackup(vol, schdname, snapname, ZFSBackupPort) - if err != nil { - return "", err - } + p.Log.Debugf("zfs: uploading Snapshot %s file %s", snapname, filename) - go p.checkBackupStatus(bkpname) + // reset the connection state + p.cl.ConnStateInit() - p.Log.Debugf("zfs: uploading Snapshot %s file %s", snapname, filename) - ok := p.cl.Upload(filename, size, ZFSBackupPort) + var wg sync.WaitGroup + + wg.Add(1) + go p.doUpload(&wg, filename, size, port) + + // wait for the upload server to exit + defer func() { + p.cl.ExitServer = true + wg.Wait() + p.cl.ConnReady = nil + }() + + // wait for the connection to be ready + ok := p.cl.ConnReadyWait() if !ok { - p.deleteBackup(bkpname) return "", errors.New("zfs: error in uploading snapshot") } - bkp, err := bkpbuilder.NewKubeclient(). - WithNamespace(p.namespace).Get(bkpname, metav1.GetOptions{}) - + bkpname, err := p.createBackup(vol, schdname, snapname, port) if err != nil { - p.deleteBackup(bkpname) return "", err } - if bkp.Status != apis.BKPZFSStatusDone { + err = p.checkBackupStatus(bkpname) + if err != nil { p.deleteBackup(bkpname) - return "", errors.Errorf("zfs: error in uploading snapshot, status:{%v}", bkp.Status) + p.Log.Errorf("zfs: backup failed vol %s snap %s bkpname %s err: %v", volumeID, snapname, bkpname, err) + return "", err } - return bkpname, nil + // generate the snapID + snapID := utils.GenerateSnapshotID(volumeID, snapname) + + p.Log.Debugf("zfs: backup done vol %s bkp %s snapID %s", volumeID, bkpname, snapID) + + return snapID, nil } diff --git a/pkg/zfs/plugin/restore.go b/pkg/zfs/plugin/restore.go index 33469926..e4c9beee 100644 --- a/pkg/zfs/plugin/restore.go +++ b/pkg/zfs/plugin/restore.go @@ -19,6 +19,7 @@ package plugin import ( "encoding/json" "strconv" + "sync" "time" "github.com/openebs/velero-plugin/pkg/velero" @@ -150,22 +151,37 @@ func (p *Plugin) isVolumeReady(volumeID string) (ready bool, err error) { return vol.Status.State == zfs.ZFSStatusReady, nil } -func (p *Plugin) checkRestoreStatus(snapname string) { +func (p *Plugin) checkRestoreStatus(rname, volname string) error { + defer func() { + err := restorebuilder.NewKubeclient().WithNamespace(p.namespace).Delete(rname) + if err != nil { + // ignore error + p.Log.Errorf("zfs: delete restore %s failed err: %v", rname, err) + } + }() + for { getOptions := metav1.GetOptions{} rstr, err := restorebuilder.NewKubeclient(). - WithNamespace(p.namespace).Get(snapname, getOptions) + WithNamespace(p.namespace).Get(rname, getOptions) if err != nil { - p.Log.Errorf("zfs: Failed to fetch restore {%s}", snapname) - p.cl.ExitServer = true - return + p.Log.Errorf("zfs: Failed to fetch restore {%s}", rname) + return errors.Errorf("zfs: error in getting restore status %s err %v", rname, err) } switch rstr.Status { - case apis.RSTZFSStatusDone, apis.RSTZFSStatusFailed, apis.RSTZFSStatusInvalid: - p.cl.ExitServer = true - return + case apis.RSTZFSStatusDone: + return nil + case apis.RSTZFSStatusFailed, apis.RSTZFSStatusInvalid: + // delete the volume + err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(volname) + if err != nil { + // ignore error + p.Log.Errorf("zfs: delete vol failed vol %s restore %s err: %v", volname, rname, err) + } + + return errors.Errorf("zfs: error in restoring %s, status:{%v}", rname, rstr.Status) } time.Sleep(restoreStatusInterval * time.Second) @@ -196,64 +212,65 @@ func (p *Plugin) checkVolCreation(volname string) error { return nil } -func (p *Plugin) cleanupRestore(oldvol, newvol, rname string) error { - rstr, err := restorebuilder.NewKubeclient(). - WithNamespace(p.namespace).Get(rname, metav1.GetOptions{}) - - if err != nil { - p.Log.Errorf("zfs: get restore failed vol %s => %s snap %s err: %v", oldvol, newvol, rname, err) - return err - } - - err = restorebuilder.NewKubeclient().WithNamespace(p.namespace).Delete(rname) - if err != nil { - // ignore error - p.Log.Errorf("zfs: delete restore failed vol %s => %s snap %s err: %v", oldvol, newvol, rname, err) - } - - if rstr.Status != apis.RSTZFSStatusDone { - // delete the volume - err = volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(newvol) - if err != nil { - // ignore error - p.Log.Errorf("zfs: delete vol failed vol %s => %s snap %s err: %v", oldvol, newvol, rname, err) - } - - p.Log.Errorf("zfs: restoreVolume status failed vol %s => %s snap %s", oldvol, newvol, rname) - return errors.Errorf("zfs: Failed to restore snapshoti %s, status:{%v}", rname, rstr.Status) - } - - return nil -} - // restoreVolume returns restored vol name and a boolean value indication if we need // to restore the volume. If Volume is already restored, we don't need to restore it. -func (p *Plugin) restoreVolume(rname, volname, bkpname string, port int) (string, error) { +func (p *Plugin) restoreVolume(volname, bkpname string, port int) (string, string, error) { zv, err := p.restoreZFSVolume(volname, bkpname) if err != nil { p.Log.Errorf("zfs: restore ZFSVolume failed vol %s bkp %s err %v", volname, bkpname, err) - return "", err + return "", "", err } node := zv.Spec.OwnerNodeID serverAddr := p.remoteAddr + ":" + strconv.Itoa(port) + zfsvol := zv.Name + rname := utils.GenerateSnapshotID(zfsvol, bkpname) rstr, err := restorebuilder.NewBuilder(). WithName(rname). - WithVolume(zv.Name). + WithVolume(zfsvol). WithNode(node). WithStatus(apis.RSTZFSStatusInit). WithRemote(serverAddr). Build() if err != nil { - return "", err + // delete the volume + verr := volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(zfsvol) + if verr != nil { + // ignore error + p.Log.Errorf("zfs: delete vol failed vol %s rname %s err: %v", zfsvol, rname, verr) + } + return "", "", err } + _, err = restorebuilder.NewKubeclient().WithNamespace(p.namespace).Create(rstr) - return zv.Name, err + + if err != nil { + // delete the volume + verr := volbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(zfsvol) + if verr != nil { + // ignore error + p.Log.Errorf("zfs: delete vol failed vol %s rname %s err: %v", zfsvol, rname, err) + } + return "", "", err + } + return zfsvol, rname, nil +} + +func (p *Plugin) doDownload(wg *sync.WaitGroup, filename string, port int) { + defer wg.Done() + + ok := p.cl.Download(filename, port) + if !ok { + p.Log.Errorf("zfs: failed to download the file %s", filename) + *p.cl.ConnReady <- false + } + // done with the channel, close it + close(*p.cl.ConnReady) } -func (p *Plugin) doRestore(snapshotID string) (string, error) { +func (p *Plugin) doRestore(snapshotID string, port int) (string, error) { volname, bkpname, err := utils.GetInfoFromSnapshotID(snapshotID) if err != nil { @@ -265,21 +282,36 @@ func (p *Plugin) doRestore(snapshotID string) (string, error) { return "", errors.Errorf("zfs: Error creating remote file name for restore") } - newvol, err := p.restoreVolume(snapshotID, volname, bkpname, ZFSRestorePort) + // init the connection state + p.cl.ConnStateInit() + + var wg sync.WaitGroup + + wg.Add(1) + go p.doDownload(&wg, filename, port) + + // wait for the download server to exit + defer func() { + p.cl.ExitServer = true + wg.Wait() + p.cl.ConnReady = nil + }() + + // wait for the connection to be ready + ok := p.cl.ConnReadyWait() + if !ok { + return "", errors.Errorf("zfs: restore server is not ready") + } + + newvol, rname, err := p.restoreVolume(volname, bkpname, port) if err != nil { p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, bkpname, err) return "", err } - go p.checkRestoreStatus(snapshotID) - - ret := p.cl.Download(filename, ZFSRestorePort) - if !ret { - p.cleanupRestore(volname, newvol, snapshotID) - return "", errors.New("zfs: failed to restore snapshot") - } - - if err := p.cleanupRestore(volname, newvol, snapshotID); err != nil { + err = p.checkRestoreStatus(rname, newvol) + if err != nil { + p.Log.Errorf("zfs: restore failed vol %s snap %s err: %v", volname, bkpname, err) return "", err } diff --git a/pkg/zfs/plugin/zfs.go b/pkg/zfs/plugin/zfs.go index c3199d5e..0c61fce2 100644 --- a/pkg/zfs/plugin/zfs.go +++ b/pkg/zfs/plugin/zfs.go @@ -119,7 +119,7 @@ func (p *Plugin) Init(config map[string]string) error { func (p *Plugin) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) { p.Log.Debugf("zfs: CreateVolumeFromSnapshot called snap %s", snapshotID) - volumeID, err := p.doRestore(snapshotID) + volumeID, err := p.doRestore(snapshotID, ZFSRestorePort) if err != nil { p.Log.Errorf("zfs: error CreateVolumeFromSnapshot returning snap %s err %v", snapshotID, err) @@ -156,7 +156,7 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin schdname, _ := tags[VeleroSchdKey] - snapshotID, err := p.doBackup(volumeID, bkpname, schdname) + snapshotID, err := p.doBackup(volumeID, bkpname, schdname, ZFSBackupPort) if err != nil { p.Log.Errorf("zfs: error createBackup %s@%s failed %v", volumeID, bkpname, err)