Skip to content

Commit

Permalink
feat(velero): wait for plugin server to be UP before doing backup/res…
Browse files Browse the repository at this point in the history
…tore (#117)

We are starting the restore operation and then starting the server.
It could lead to a connection failure if server is not ready when
ZFS-LocalPV starts the restore operation.

Here, we are waiting for the server to be up before starting the
restore operation. Also re-factored few codes.

Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 authored Sep 30, 2020
1 parent 9a77ff7 commit 9a3232a
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 91 deletions.
File renamed without changes.
1 change: 1 addition & 0 deletions changelogs/unreleased/117-pawanpraka1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
wait for plugin server to be ready before doing backup/restore
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/onsi/gomega v1.7.1
github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339
github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0
github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b
github.com/openebs/zfs-localpv v1.0.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/sirupsen/logrus v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339 h1:T6IuGc8zikB4XG+s
github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339/go.mod h1:TASujm6H1LGdx43MN7Dab1xdAqR7MVU8bsS74Ywop5w=
github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0 h1:9o6+N3YkssQvUlmJnqNULSxsGFO/rb1we8MtYKr5ze4=
github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0/go.mod h1:QQY9cOHKQwZ73qbv6O//UYUBLNV2S0MRDIfG7t5KOCk=
github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b h1:f7q/JlKx165iBnn8xTdFFvU6D3JGe7iz9Jtvx61UzC8=
github.com/openebs/zfs-localpv v0.9.0-RC1.0.20200908081439-e40026c98a2b/go.mod h1:EvN06mQH8s6oGq4ybAs1DLqy0fVS3CDh7xmvPLBQtpE=
github.com/openebs/zfs-localpv v1.0.0 h1:aBXyYEFN+5NTGiqT6xnqLp1N34U8AUbh9vlhSA8BQnI=
github.com/openebs/zfs-localpv v1.0.0/go.mod h1:EvN06mQH8s6oGq4ybAs1DLqy0fVS3CDh7xmvPLBQtpE=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c/go.mod h1:VyrYX9gd7irzKovcSS6BIIEwPRkP2Wm2m9ufcdFSJ34=
Expand Down
3 changes: 3 additions & 0 deletions pkg/clouduploader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Conn struct {

// exitServer, if server connection needs to be stopped or not
ExitServer bool

// ConnReady describes the connection ready state
ConnReady *chan bool
}

// setupBucket creates a connection to a particular cloud provider's blob storage.
Expand Down
13 changes: 13 additions & 0 deletions pkg/clouduploader/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,16 @@ func (c *Conn) GenerateRemoteFilename(file, backup string) string {
}
return c.backupPathPrefix + "/" + backupDir + "/" + backup + "/" + c.prefix + "-" + file + "-" + backup
}

// ConnStateReset resets the channel and exit server flag
func (c *Conn) ConnStateReset() {
ch := make(chan bool, 1)
c.ConnReady = &ch
c.ExitServer = false
}

// ConnReadyWait will return when connection is ready to accept the connection
func (c *Conn) ConnReadyWait() bool {
ok := <-*c.ConnReady
return ok
}
5 changes: 5 additions & 0 deletions pkg/clouduploader/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (s *Server) Run(opType ServerOperation, port int) error {
return err
}

if s.cl.ConnReady != nil {
// Connection has started listening on the specified port
*s.cl.ConnReady <- true
}

epfd, err := syscall.EpollCreate1(0)
if err != nil {
s.Log.Errorf("Failed to create epoll: %s", err.Error())
Expand Down
84 changes: 52 additions & 32 deletions pkg/zfs/plugin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"sort"
"strconv"
"sync"
"time"

"github.com/openebs/velero-plugin/pkg/zfs/utils"
Expand Down Expand Up @@ -144,34 +145,44 @@ func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string, po
return "", err
}

return bkpname, err
return bkpname, nil
}

func (p *Plugin) checkBackupStatus(bkpname string) {
bkpDone := false

for !bkpDone {
func (p *Plugin) checkBackupStatus(bkpname string) error {
for {
getOptions := metav1.GetOptions{}
bkp, err := bkpbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(bkpname, getOptions)

if err != nil {
p.Log.Errorf("zfs: Failed to fetch backup info {%s}", bkpname)
p.cl.ExitServer = true
return
return errors.Errorf("zfs: error in getting bkp status err %v", err)
}

time.Sleep(backupStatusInterval * time.Second)

switch bkp.Status {
case apis.BKPZFSStatusDone, apis.BKPZFSStatusFailed, apis.BKPZFSStatusInvalid:
bkpDone = true
p.cl.ExitServer = true
case apis.BKPZFSStatusDone:
return nil
case apis.BKPZFSStatusFailed, apis.BKPZFSStatusInvalid:
return errors.Errorf("zfs: error in uploading snapshot, status:{%v}", bkp.Status)
}

time.Sleep(backupStatusInterval * time.Second)
}
}

func (p *Plugin) doUpload(wg *sync.WaitGroup, filename string, size int64, port int) {
defer wg.Done()

ok := p.cl.Upload(filename, size, port)
if !ok {
p.Log.Errorf("zfs: Failed to upload file %s", filename)
*p.cl.ConnReady <- false
}
// done with the channel, close it
close(*p.cl.ConnReady)
}

func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (string, error) {
func (p *Plugin) doBackup(volumeID string, snapname string, schdname string, port int) (string, error) {
pv, err := p.getPV(volumeID)
if err != nil {
p.Log.Errorf("zfs: Failed to get pv %s snap %s schd %s err %v", volumeID, snapname, schdname, err)
Expand Down Expand Up @@ -201,9 +212,6 @@ func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (st
return "", errors.Errorf("zfs: err pv is not claimed")
}

// reset the exit server to false
p.cl.ExitServer = false

filename := p.cl.GenerateRemoteFilename(volumeID, snapname)
if filename == "" {
return "", errors.Errorf("zfs: error creating remote file name for backup")
Expand All @@ -219,33 +227,45 @@ func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (st
return "", errors.Errorf("zfs: error parsing the size %s", vol.Spec.Capacity)
}

// TODO(pawan) should wait for upload server to be up
bkpname, err := p.createBackup(vol, schdname, snapname, ZFSBackupPort)
if err != nil {
return "", err
}
p.Log.Debugf("zfs: uploading Snapshot %s file %s", snapname, filename)

go p.checkBackupStatus(bkpname)
// reset the connection state
p.cl.ConnStateReset()

p.Log.Debugf("zfs: uploading Snapshot %s file %s", snapname, filename)
ok := p.cl.Upload(filename, size, ZFSBackupPort)
var wg sync.WaitGroup

wg.Add(1)
go p.doUpload(&wg, filename, size, port)

// wait for the upload server to exit
defer func() {
p.cl.ExitServer = true
wg.Wait()
p.cl.ConnReady = nil
}()

// wait for the connection to be ready
ok := p.cl.ConnReadyWait()
if !ok {
p.deleteBackup(bkpname)
return "", errors.New("zfs: error in uploading snapshot")
}

bkp, err := bkpbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(bkpname, metav1.GetOptions{})

bkpname, err := p.createBackup(vol, schdname, snapname, port)
if err != nil {
p.deleteBackup(bkpname)
return "", err
}

if bkp.Status != apis.BKPZFSStatusDone {
err = p.checkBackupStatus(bkpname)
if err != nil {
p.deleteBackup(bkpname)
return "", errors.Errorf("zfs: error in uploading snapshot, status:{%v}", bkp.Status)
p.Log.Errorf("zfs: backup failed vol %s snap %s bkpname %s err: %v", volumeID, snapname, bkpname, err)
return "", err
}

return bkpname, nil
// generate the snapID
snapID := utils.GenerateSnapshotID(volumeID, snapname)

p.Log.Debugf("zfs: backup done vol %s bkp %s snapID %s", volumeID, bkpname, snapID)

return snapID, nil
}
Loading

0 comments on commit 9a3232a

Please sign in to comment.