Skip to content

Commit

Permalink
feat(clone): add support for creating the Clone from volume as dataso…
Browse files Browse the repository at this point in the history
…urce

We can now create the Clone from pvc directly

```
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: pvc-clone
spec:
  storageClassName: openebs-snap
  dataSource:
    name: pvc-snap
    kind: PersistentVolumeClaim
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 4Gi
```
The ZFS_LocalPV driver will create one internal snapshot of the name
same as the new volume name and will create a clone out of it. Also,
while destroying the volume the driver will take care of deleting
the created snapshot for the clone.

Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 committed Nov 10, 2020
1 parent 64bc7cb commit e78de3f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/234-pawanpraka1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add support for creating the Clone from volume as datasource
57 changes: 54 additions & 3 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,56 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
return selected, nil
}

// CreateZFSClone create a clone of zfs volume
func CreateZFSClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
// CreateVolClone creates the clone from a volume
func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error) {
volName := req.GetName()
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)
volsize := strconv.FormatInt(int64(size), 10)

vol, err := zfs.GetZFSVolume(srcVol)
if err != nil {
return "", status.Error(codes.NotFound, err.Error())
}

if vol.Spec.PoolName != pool {
return "", status.Errorf(codes.Internal,
"vol clone to a different pool src pool %s dst pool %s",
vol.Spec.PoolName, pool)
}

if vol.Spec.Capacity != volsize {
return "", status.Error(codes.Internal, "vol clone volume size is not matching")
}

selected := vol.Spec.OwnerNodeID

labels := map[string]string{zfs.ZFSVolKey: vol.Name}

// create the clone from the source volume

volObj, err := volbuilder.NewBuilder().
WithName(volName).
WithVolumeStatus(zfs.ZFSStatusPending).
WithLabels(labels).Build()

volObj.Spec = vol.Spec
// use the snapshot name same as new volname
volObj.Spec.SnapName = vol.Name + "@" + volName

err = zfs.ProvisionVolume(volObj)
if err != nil {
return "", status.Errorf(codes.Internal,
"clone: not able to provision the volume %s", err.Error())
}

return selected, nil
}

// CreateSnapClone creates the clone from a snapshot
func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {

volName := req.GetName()
parameters := req.GetParameters()
Expand Down Expand Up @@ -243,7 +291,10 @@ func (cs *controller) CreateVolume(
if contentSource != nil && contentSource.GetSnapshot() != nil {
snapshotID := contentSource.GetSnapshot().GetSnapshotId()

selected, err = CreateZFSClone(req, snapshotID)
selected, err = CreateSnapClone(req, snapshotID)
} else if contentSource != nil && contentSource.GetVolume() != nil {
srcVol := contentSource.GetVolume().GetVolumeId()
selected, err = CreateVolClone(req, srcVol)
} else {
selected, err = CreateZFSVolume(req)
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/zfs/zfs_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,26 @@ func CreateVolume(vol *apis.ZFSVolume) error {
func CreateClone(vol *apis.ZFSVolume) error {
volume := vol.Spec.PoolName + "/" + vol.Name

if srcVol, ok := vol.Labels[ZFSVolKey]; ok {
// datasource is volume, create the snapshot first
snap := &apis.ZFSSnapshot{}
snap.Name = vol.Name // use volname as snapname
snap.Spec = vol.Spec
// add src vol name
snap.Labels = map[string]string{ZFSVolKey: srcVol}

klog.Infof("creating snapshot %s@%s for the clone %s", srcVol, snap.Name, volume)

err := CreateSnapshot(snap)

if err != nil {
klog.Errorf(
"zfs: could not create snapshot for the clone vol %s snap %s err %v", volume, snap.Name, err,
)
return err
}
}

if err := getVolume(volume); err != nil {
var args []string
args = buildCloneCreateArgs(vol)
Expand Down Expand Up @@ -580,6 +600,27 @@ func DestroyVolume(vol *apis.ZFSVolume) error {
)
return err
}

if srcVol, ok := vol.Labels[ZFSVolKey]; ok {
// datasource is volume, delete the dependent snapshot
snap := &apis.ZFSSnapshot{}
snap.Name = vol.Name // snapname is same as volname
snap.Spec = vol.Spec
// add src vol name
snap.Labels = map[string]string{ZFSVolKey: srcVol}

klog.Infof("destroying snapshot %s@%s for the clone %s", srcVol, snap.Name, volume)

err := DestroySnapshot(snap)

if err != nil {
// no need to reconcile as volume has already been deleted
klog.Errorf(
"zfs: could not destroy snapshot for the clone vol %s snap %s err %v", volume, snap.Name, err,
)
}
}

klog.Infof("destroyed volume %s", volume)

return nil
Expand Down

0 comments on commit e78de3f

Please sign in to comment.