Skip to content

Commit

Permalink
feat(velero): adding velero plugin for ZFS-LocalPV
Browse files Browse the repository at this point in the history
A new provider(openebs.io/zfspv-blockstore) has been added to
support backup and restore for ZFS-LocalPV. We can use below
VolumeSnapshotLocation to backup/restore the data to/from the cloud.

```yaml
apiVersion: velero.io/v1
kind: VolumeSnapshotLocation
metadata:
  name: default
  namespace: velero
spec:
  provider: openebs.io/zfspv-blockstore
  config:
    bucket: velero
    prefix: zfs
    namespace: openebs
    provider: aws
    region: minio
    s3ForcePathStyle: "true"
    s3Url: http://minio.velero.svc:9000
```

Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 committed Aug 26, 2020
1 parent 7ce0bc1 commit e10fa83
Show file tree
Hide file tree
Showing 10 changed files with 1,350 additions and 6 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ all: build
container: all
@echo ">> building container"
@cp Dockerfile _output/Dockerfile
docker build -t $(IMAGE):$(IMAGE_TAG) ${DBUILD_ARGS} -f _output/Dockerfile _output
@sudo docker build -t $(IMAGE):$(IMAGE_TAG) ${DBUILD_ARGS} -f _output/Dockerfile _output

build:
@echo ">> building binary"
Expand All @@ -96,7 +96,7 @@ gomod: ## Ensures fresh go.mod and go.sum.
# Run linter using docker image
lint-docker: gomod
@echo ">> running golangci-lint"
@docker run -i \
@sudo docker run -i \
--rm -v $$(pwd):/app -w /app \
golangci/golangci-lint:v1.24.0 \
bash -c "GOGC=75 golangci-lint run"
Expand Down
1 change: 1 addition & 0 deletions changelogs/unreleased/102-pawanpraka1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
adding velero plugin for ZFS-LocalPV
40 changes: 36 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,53 @@ require (
github.com/aws/aws-sdk-go v1.31.13
github.com/ghodss/yaml v1.0.0
github.com/gofrs/uuid v3.2.0+incompatible
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/hashicorp/go-plugin v1.0.1-0.20190610192547-a1bc61569a26 // indirect
github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.0
github.com/openebs/api v1.10.0
github.com/onsi/ginkgo v1.10.3
github.com/onsi/gomega v1.7.1
github.com/openebs/api v1.11.1-0.20200629052954-e52e2bcd8339
github.com/openebs/maya v0.0.0-20200411140727-1c81f9e017b0
github.com/openebs/zfs-localpv v1.0.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/sirupsen/logrus v1.5.0
github.com/spf13/cobra v1.0.0 // indirect
github.com/vmware-tanzu/velero v1.3.2
gocloud.dev v0.20.0
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
google.golang.org/api v0.26.0
google.golang.org/appengine v1.6.6 // indirect
k8s.io/api v0.17.4
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.17.3
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/utils v0.0.0-20191218082557-f07c713de883 // indirect
)

replace (
github.com/openebs/zfs-localpv => /home/pawan/Desktop/openebs/src/github.com/openebs/zfs-localpv
k8s.io/api => k8s.io/api v0.15.12
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.15.12
k8s.io/apimachinery => k8s.io/apimachinery v0.15.13-beta.0
k8s.io/apiserver => k8s.io/apiserver v0.15.12
k8s.io/cli-runtime => k8s.io/cli-runtime v0.15.12
k8s.io/client-go => k8s.io/client-go v0.15.12
k8s.io/cloud-provider => k8s.io/cloud-provider v0.15.12
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.15.12
k8s.io/code-generator => k8s.io/code-generator v0.15.13-beta.0
k8s.io/component-base => k8s.io/component-base v0.15.12
k8s.io/cri-api => k8s.io/cri-api v0.15.13-beta.0
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.15.12
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.15.12
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.15.12
k8s.io/kube-proxy => k8s.io/kube-proxy v0.15.12
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.15.12
k8s.io/kubectl => k8s.io/kubectl v0.15.13-beta.0
k8s.io/kubelet => k8s.io/kubelet v0.15.12
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.15.12
k8s.io/metrics => k8s.io/metrics v0.15.12
k8s.io/node-api => k8s.io/node-api v0.15.12
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.15.12
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.15.12
k8s.io/sample-controller => k8s.io/sample-controller v0.15.12
)
379 changes: 379 additions & 0 deletions go.sum

Large diffs are not rendered by default.

251 changes: 251 additions & 0 deletions pkg/zfs/plugin/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
Copyright 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugin

import (
"time"
"sort"
"encoding/json"
"strconv"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"github.com/openebs/velero-plugin/pkg/zfs/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/bkpbuilder"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
)

const (
VeleroBkpKey = "velero.io/backup"
VeleroSchdKey = "velero.io/schedule-name"
VeleroVolKey = "velero.io/volname"
VeleroNsKey = "velero.io/namespace"
)


func (p *Plugin) getPV(volumeID string) (*v1.PersistentVolume, error) {
return p.K8sClient.
CoreV1().
PersistentVolumes().
Get(volumeID, metav1.GetOptions{})
}

func (p *Plugin) uploadZFSVolume(vol *apis.ZFSVolume, filename string) error {
data, err := json.MarshalIndent(vol, "", "\t")
if err != nil {
return errors.New("zfs: error doing json parsing")
}

if ok := p.cl.Write(data, filename+".zfsvol"); !ok {
return errors.New("zfs: failed to upload ZFSVolume")
}

return nil
}

// deleteBackup deletes the backup resource
func (p *Plugin) deleteBackup(snapshotID string) error {
err := bkpbuilder.NewKubeclient().WithNamespace(p.namespace).Delete(snapshotID)
if err != nil {
p.Log.Errorf("zfs: Failed to delete the backup %s", snapshotID)
}

return err
}

func (p *Plugin) getPrevSnap(volname, schdname string) (string, error) {
listOptions := metav1.ListOptions{
LabelSelector: VeleroSchdKey + "=" + schdname + "," + VeleroVolKey + "=" + volname,
}

bkpList, err := bkpbuilder.NewKubeclient().
WithNamespace(p.namespace).List(listOptions)

if err != nil {
return "", err
}

var backups []string

/*
* Backup names are in the form of <schdeule>-<yyyymmddhhmmss>
* to get the last snapshot, sort the list of successful backups,
* the previous snapshot will be the last element in the sorted list
*/
if len (bkpList.Items) > 0 {
for _, bkp := range bkpList.Items {
if bkp.Status == apis.BKPZFSStatusDone {
backups = append(backups, bkp.Spec.SnapName)
}
}
size := len(backups)
sort.Strings(backups)
return backups[size - 1], nil
}

return "", nil
}

func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname string, snapname string) (string, error) {

bkpname := utils.GenerateSnapshotID(vol.Name, snapname)

p.Log.Infof("zfs: creating ZFSBackup vol = %s bkp = %s schd = %s", vol.Name, bkpname, schdname)

var err error
labels := map[string]string{}
prevSnap := ""

if len(schdname) > 0 {
// add schdeule name as label
labels[VeleroSchdKey] = schdname
labels[VeleroVolKey] = vol.Name
if p.incremental {
prevSnap, err = p.getPrevSnap(vol.Name, schdname)
if err != nil {
p.Log.Errorf("zfs: Failed to get prev snapshot bkp %s err: {%v}", snapname, err)
return "", err
}
}
}

bkp, err := bkpbuilder.NewBuilder().
WithName(bkpname).
WithLabels(labels).
WithVolume(vol.Name).
WithPrevSnap(prevSnap).
WithSnap(snapname).
WithNode(vol.Spec.OwnerNodeID).
WithStatus(apis.BKPZFSStatusInit).
WithRemote(p.remoteAddr).
Build()

if err != nil {
return "", err
}
_, err = bkpbuilder.NewKubeclient().WithNamespace(p.namespace).Create(bkp)
if err != nil {
return "", err
}

return bkpname, err
}

func (p *Plugin) checkBackupStatus(bkpname string) {
bkpDone := false

for !bkpDone {
getOptions := metav1.GetOptions{}
bkp, err := bkpbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(bkpname, getOptions)

if err != nil {
p.Log.Errorf("zfs: Failed to fetch backup info {%s}", bkpname)
p.cl.ExitServer = true
return
}

time.Sleep(backupStatusInterval * time.Second)

switch bkp.Status {
case apis.BKPZFSStatusDone, apis.BKPZFSStatusFailed, apis.BKPZFSStatusInvalid:
bkpDone = true
p.cl.ExitServer = true
}
}
}

func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (string, error) {

pv, err := p.getPV(volumeID)
if err != nil {
p.Log.Errorf("zfs: Failed to get pv %s snap %s schd %s err %v", volumeID, snapname, schdname, err)
return "", err
}

if pv.Spec.PersistentVolumeSource.CSI == nil {
return "", errors.New("zfs: err not a CSI pv")
}

volHandle := pv.Spec.PersistentVolumeSource.CSI.VolumeHandle

getOptions := metav1.GetOptions{}
vol, err := volbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(volHandle, getOptions)
if err != nil {
return "", err
}

if pv.Spec.ClaimRef != nil {
// add source namespace in the label to filter it at restore time
if vol.Labels == nil {
vol.Labels = map[string]string{}
}
vol.Labels[VeleroNsKey] = pv.Spec.ClaimRef.Namespace
} else {
return "", errors.Errorf("zfs: err pv is not claimed")
}

// reset the exit server to false
p.cl.ExitServer = false

filename := p.cl.GenerateRemoteFilename(volumeID, snapname)
if filename == "" {
return "", errors.Errorf("zfs: error creating remote file name for backup")
}

err = p.uploadZFSVolume(vol, filename)
if err != nil {
return "", err
}

size, err := strconv.ParseInt(vol.Spec.Capacity, 10, 64)
if err != nil {
return "", errors.Errorf("zfs: error parsing the size %s", vol.Spec.Capacity)
}

// TODO(pawan) should wait for upload server to be up
bkpname, err := p.createBackup(vol, schdname, snapname)
if err != nil {
return "", err
}

go p.checkBackupStatus(bkpname)

p.Log.Infof("zfs: uploading Snapshot %s file %s", snapname, filename)
ok := p.cl.Upload(filename, size)
if !ok {
p.deleteBackup(bkpname)
return "", errors.New("zfs: error in uploading snapshot")
}

bkp, err := bkpbuilder.NewKubeclient().
WithNamespace(p.namespace).Get(bkpname, metav1.GetOptions{})

if err != nil {
p.deleteBackup(bkpname)
return "", err
}

if bkp.Status != apis.BKPZFSStatusDone {
p.deleteBackup(bkpname)
return "", errors.Errorf("zfs: error in uploading snapshot, status:{%v}", bkp.Status)
}

return bkpname, nil
}
Loading

0 comments on commit e10fa83

Please sign in to comment.