Skip to content

Commit

Permalink
Add support for block volumes
Browse files Browse the repository at this point in the history
Signed-off-by: David Zaninovic <[email protected]>
  • Loading branch information
dzaninovic committed Sep 7, 2023
1 parent cddc89e commit 4aff462
Show file tree
Hide file tree
Showing 29 changed files with 503 additions and 99 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6680-dzaninovic
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for block volumes with Kopia
6 changes: 6 additions & 0 deletions design/CLI/PoC/overlays/plugins/node-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
- mountPath: /host_pods
mountPropagation: HostToContainer
name: host-pods
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
- mountPath: /scratch
name: scratch
- mountPath: /credentials
Expand All @@ -60,6 +63,9 @@ spec:
- hostPath:
path: /var/lib/kubelet/pods
name: host-pods
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
- emptyDir: {}
name: scratch
- name: cloud-credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ type Provider interface {
In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/[email protected]/fs#StreamingFile) to stream the device and backup to the kopia repository.
```go
func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
func getLocalBlockEntry(kopiaEntry fs.Entry) (fs.Entry, error) {
path := kopiaEntry.LocalFilesystemPath()

fileInfo, err := os.Lstat(path)
Expand Down Expand Up @@ -748,7 +748,7 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
}

if volMode == PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
sourceEntry, err = getLocalBlockEntry(sourceEntry)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local block device entry")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui
return b
}

// VolumeMode sets the PersistentVolume's volume mode.
func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder {
b.object.Spec.VolumeMode = &volMode
return b
}

// NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement.
func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder {
b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/cli/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Options struct {
BackupStorageConfig flag.Map
VolumeSnapshotConfig flag.Map
UseNodeAgent bool
PrivilegedAgent bool
//TODO remove UseRestic when migration test out of using it
UseRestic bool
Wait bool
Expand Down Expand Up @@ -110,6 +111,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.")
flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
flags.BoolVar(&o.PrivilegedAgent, "privileged-agent", o.PrivilegedAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
Expand Down Expand Up @@ -198,6 +200,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
SecretData: secretData,
RestoreOnly: o.RestoreOnly,
UseNodeAgent: o.UseNodeAgent,
PrivilegedAgent: o.PrivilegedAgent,
UseVolumeSnapshots: o.UseVolumeSnapshots,
BSLConfig: o.BackupStorageConfig.Data(),
VSLConfig: o.VolumeSnapshotConfig.Data(),
Expand Down
28 changes: 23 additions & 5 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

exposeParam := r.setupExposeParam(du)
exposeParam, err := r.setupExposeParam(du)
if err != nil {
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
}

// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
Expand Down Expand Up @@ -733,18 +736,33 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
r.dataPathMgr.RemoveAsyncBR(duName)
}

func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} {
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: du.Spec.SourceNamespace,
Name: du.Spec.SourcePVC,
}, pvc)

if err != nil {
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

accessMode := exposer.AccessModeFileSystem
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
accessMode = exposer.AccessModeBlock
}

return &exposer.CSISnapshotExposeParam{
SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot,
SourceNamespace: du.Spec.SourceNamespace,
StorageClass: du.Spec.CSISnapshot.StorageClass,
HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name},
AccessMode: exposer.AccessModeFileSystem,
AccessMode: accessMode,
Timeout: du.Spec.OperationTimeout.Duration,
}
}, nil
}
return nil
return nil, nil
}

func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} {
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) {
name string
du *velerov2alpha1api.DataUpload
pod *corev1.Pod
pvc *corev1.PersistentVolumeClaim
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataMgr *datapath.Manager
expectedProcessed bool
Expand Down Expand Up @@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) {
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail to get PVC information",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
Expand Down Expand Up @@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err)
}

if test.pvc != nil {
err = r.client.Create(ctx, test.pvc)
require.NoError(t, err)
}

if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
Expand Down
15 changes: 8 additions & 7 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)

volMode, sourcePath := getPersistentVolumeMode(source)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, sourcePath, realSource, tags, forceFull, parentSnapshot, volMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -155,10 +156,10 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}

volMode := getPersistentVolumeMode(target)
volMode, targetPath := getPersistentVolumeMode(target)

go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, targetPath, volMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -172,11 +173,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}

func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
func getPersistentVolumeMode(source AccessPoint) (uploader.PersistentVolumeMode, string) {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
return uploader.PersistentVolumeBlock, source.ByBlock
}
return uploader.PersistentVolumeFilesystem
return uploader.PersistentVolumeFilesystem, source.ByPath
}

// UpdateProgress which implement ProgressUpdater interface to update progress status
Expand Down
28 changes: 22 additions & 6 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.Obj
}

func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) {
if accessMode == AccessModeFileSystem {
switch accessMode {
case AccessModeFileSystem:
return corev1.PersistentVolumeFilesystem, nil
} else {
case AccessModeBlock:
return corev1.PersistentVolumeBlock, nil
default:
return "", errors.Errorf("unsupported access mode %s", accessMode)
}
}
Expand Down Expand Up @@ -355,6 +358,21 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co

var gracePeriod int64 = 0

var volumeMounts []corev1.VolumeMount = nil
var volumeDevices []corev1.VolumeDevice = nil

if backupPVC.Spec.VolumeMode != nil && *backupPVC.Spec.VolumeMode == corev1.PersistentVolumeBlock {
volumeDevices = []corev1.VolumeDevice{{
Name: volumeName,
DevicePath: "/" + volumeName,
}}
} else {
volumeMounts = []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}}
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Expand All @@ -377,10 +395,8 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
Image: podInfo.image,
ImagePullPolicy: corev1.PullNever,
Command: []string{"/velero-helper", "pause"},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}},
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
},
},
ServiceAccountName: podInfo.serviceAccount,
Expand Down
26 changes: 20 additions & 6 deletions pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
}

restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, hostingPodLabels, selectedNode)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
Expand Down Expand Up @@ -247,7 +247,8 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
return nil
}

func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) {
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
label map[string]string, selectedNode string) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name

Expand All @@ -261,6 +262,21 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec

var gracePeriod int64 = 0

var volumeMounts []corev1.VolumeMount = nil
var volumeDevices []corev1.VolumeDevice = nil

if targetPVC.Spec.VolumeMode != nil && *targetPVC.Spec.VolumeMode == corev1.PersistentVolumeBlock {
volumeDevices = []corev1.VolumeDevice{{
Name: volumeName,
DevicePath: "/" + volumeName,
}}
} else {
volumeMounts = []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}}
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: restorePodName,
Expand All @@ -283,10 +299,8 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
Image: podInfo.image,
ImagePullPolicy: corev1.PullNever,
Command: []string{"/velero-helper", "pause"},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}},
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
},
},
ServiceAccountName: podInfo.serviceAccount,
Expand Down
18 changes: 13 additions & 5 deletions pkg/exposer/host_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
Expand All @@ -38,14 +39,19 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin
cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) {
logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("volume", volumeName)

volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli)
volDir, volMode, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli)
if err != nil {
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for volume %s in pod %s", volumeName, pod.Name)
}

logger.WithField("volDir", volDir).Info("Got volume dir")

pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pod.GetUID()), volDir)
volSubDir := "volumes"
if volMode == uploader.PersistentVolumeBlock {
volSubDir = "volumeDevices"
}

pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir)
logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")

path, err := singlePathMatch(pathGlob, fs, logger)
Expand All @@ -55,7 +61,9 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin

logger.WithField("path", path).Info("Found path matching glob")

return datapath.AccessPoint{
ByPath: path,
}, nil
if volMode == uploader.PersistentVolumeBlock {
return datapath.AccessPoint{ByBlock: path}, nil
}

return datapath.AccessPoint{ByPath: path}, nil
}
Loading

0 comments on commit 4aff462

Please sign in to comment.