Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: include trashed parent images while calculating the clone depth #4029

Draft
wants to merge 8 commits into
base: devel
Choose a base branch
from
26 changes: 18 additions & 8 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,9 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
rbdVol.Monitors,
rbdVol.RbdImageName,
cr)
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
} else if err != nil {
return status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -995,7 +997,7 @@ func cleanupRBDImage(ctx context.Context,
if inUse {
log.ErrorLog(ctx, "rbd %s is still being used", rbdVol)

return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName)
return nil, status.Errorf(codes.FailedPrecondition, "rbd %s is still being used", rbdVol.RbdImageName)
}

// delete the temporary rbd image created as part of volume clone during
Expand All @@ -1017,9 +1019,10 @@ func cleanupRBDImage(ctx context.Context,
}
}

// Deleting rbd image
// Deleting rbd image, it isn't a failure if the image was deleted already.
log.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName)
if err = rbdVol.deleteImage(ctx); err != nil {
err = rbdVol.deleteImage(ctx)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v",
rbdVol, err)

Expand Down Expand Up @@ -1166,13 +1169,18 @@ func (cs *ControllerServer) CreateSnapshot(
}
}()

readyToUse := true

vol, err := cs.doSnapshotClone(ctx, rbdVol, rbdSnap, cr)
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
readyToUse = false
} else if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Update the metadata on snapshot not on the original image
rbdVol.RbdImageName = rbdSnap.RbdSnapName
rbdVol.ImageID = vol.ImageID
rbdVol.ClusterName = cs.ClusterName

defer func() {
Expand Down Expand Up @@ -1203,7 +1211,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: vol.VolID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: vol.CreatedAt,
ReadyToUse: true,
ReadyToUse: readyToUse,
},
}, nil
}
Expand Down Expand Up @@ -1234,10 +1242,12 @@ func cloneFromSnapshot(
return nil, status.Error(codes.Internal, err.Error())
}

readyToUse := true

err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errors.Is(err, ErrFlattenInProgress) {
// if flattening is in progress, return error and do not cleanup
return nil, status.Errorf(codes.Internal, err.Error())
readyToUse = false
} else if err != nil {
uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr)
if uErr != nil {
Expand All @@ -1263,7 +1273,7 @@ func cloneFromSnapshot(
SnapshotId: rbdSnap.VolID,
SourceVolumeId: rbdSnap.SourceVolumeID,
CreationTime: rbdSnap.CreatedAt,
ReadyToUse: true,
ReadyToUse: readyToUse,
},
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func flattenImageBeforeMapping(
if err != nil {
return err
}
depth, err = volOptions.getCloneDepth(ctx)
depth, err = volOptions.getCloneDepth(rbdHardMaxCloneDepth + 1)
if err != nil {
return err
}
Expand Down
131 changes: 94 additions & 37 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type rbdImage struct {
NamePrefix string
// ParentName represents the parent image name of the image.
ParentName string
// ParentID represents the parent image ID of the image.
ParentID string
// Parent Pool is the pool that contains the parent image.
ParentPool string
// Cluster name
Expand Down Expand Up @@ -505,7 +507,14 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
return nil, err
}

image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
var image *librbd.Image

// try to open by id, that works for images in trash too
if ri.ImageID != "" {
image, err = librbd.OpenImageById(ri.ioctx, ri.ImageID, librbd.NoSnapshot)
} else {
image, err = librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
}
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
err = util.JoinErrors(ErrImageNotFound, err)
Expand Down Expand Up @@ -697,47 +706,80 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error {
return nil
}

func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
var depth uint
vol := rbdVolume{}
// getCloneDepth walks the parents of the image and returns the number of
// images in the chain. The `maxDepth` argument to the function is used to
// specify a limit of the depth to check. When `maxDepth` depth is reached, no
// further traversing of the depth is required.
//
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
// This function re-uses the ioctx of the image to open all images in the
// chain. There is no need to open new ioctx's for every image.
func (ri *rbdImage) getCloneDepth(maxDepth uint) (uint, error) {
var (
depth uint
info *librbd.ParentInfo
)

vol.Pool = ri.Pool
vol.Monitors = ri.Monitors
vol.RbdImageName = ri.RbdImageName
vol.RadosNamespace = ri.RadosNamespace
vol.conn = ri.conn.Copy()
image, err := ri.open()
if err != nil {
return 0, fmt.Errorf("failed to open image %s: %w", ri, err)
}

// get the librbd.ImageSpec of the parent
info, err = image.GetParent()

// Close this image, it is not used anymore. Using defer to close it
// and replacing the image with an other image can result in resource
// leaks according to golangci-lint.
image.Close()

for {
if vol.RbdImageName == "" {
return depth, nil
if errors.Is(err, librbd.ErrNotFound) {
// image does not have a parent
break
} else if err != nil {
return 0, fmt.Errorf("failed to get parent of image %s at depth %d: %w", ri, depth, err)
}
err := vol.openIoctx()
if err != nil {
return depth, err

// if the parent is in trash, return maxDepth to trigger
// flattening
if info.Image.Trash {
return maxDepth, nil
}

err = vol.getImageInfo()
// FIXME: create and destroy the vol inside the loop.
// see https://github.com/ceph/ceph-csi/pull/1838#discussion_r598530807
vol.ioctx.Destroy()
vol.ioctx = nil
if err != nil {
// if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case
// return the found depth
if errors.Is(err, ErrImageNotFound) {
return depth, nil
}
log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err)
// if there is a parent, count it to the depth
depth++

return depth, err
// if maxDepth (usually hard-limit) is reached, further
// traversing parents is not needed, some action (flattening)
// will be triggered regardless of deeper depth
if depth == maxDepth {
break
}
if vol.ParentName != "" {
depth++

// open the parent image, so that the for-loop can continue
// with checking for the parent of the parent
image, err = librbd.OpenImageByIdReadOnly(ri.ioctx, info.Image.ImageID, librbd.NoSnapshot)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
// parent image does not exist, no parent after all
break
} else if err != nil {
imageSpec := info.Image.ImageID
if info.Snap.SnapName != librbd.NoSnapshot {
imageSpec += "@" + info.Snap.SnapName
}

return 0, fmt.Errorf("failed to open parent image ID %q, at depth %d: %w", imageSpec, depth, err)
}
vol.RbdImageName = vol.ParentName
vol.Pool = vol.ParentPool

info, err = image.GetParent()
// info and err checking is done in the top of the for loop

// Using defer in a for loop seems to be problematic. Just
// always close the image.
image.Close()
}

return depth, nil
}

type trashSnapInfo struct {
Expand Down Expand Up @@ -803,7 +845,7 @@ func (ri *rbdImage) flattenRbdImage(

// skip clone depth check if request is for force flatten
if !forceFlatten {
depth, err = ri.getCloneDepth(ctx)
depth, err = ri.getCloneDepth(hardlimit)
if err != nil {
return err
}
Expand All @@ -830,7 +872,9 @@ func (ri *rbdImage) flattenRbdImage(
_, err = ta.AddFlatten(admin.NewImageSpec(ri.Pool, ri.RadosNamespace, ri.RbdImageName))
rbdCephMgrSupported := isCephMgrSupported(ctx, ri.ClusterID, err)
if rbdCephMgrSupported {
if err != nil {
// it is not possible to flatten an image that is in trash (ErrNotFound)
switch {
case err != nil && !errors.Is(err, rados.ErrNotFound):
// discard flattening error if the image does not have any parent
rbdFlattenNoParent := fmt.Sprintf("Image %s/%s does not have a parent", ri.Pool, ri.RbdImageName)
if strings.Contains(err.Error(), rbdFlattenNoParent) {
Expand All @@ -839,11 +883,11 @@ func (ri *rbdImage) flattenRbdImage(
log.ErrorLog(ctx, "failed to add task flatten for %s : %v", ri, err)

return err
}
if forceFlatten || depth >= hardlimit {
case forceFlatten || depth >= hardlimit:
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName)
default:
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
if !rbdCephMgrSupported {
log.ErrorLog(
Expand Down Expand Up @@ -875,6 +919,9 @@ func (ri *rbdImage) getParentName() (string, error) {
return "", err
}

ri.ParentName = parentInfo.Image.ImageName
ri.ParentID = parentInfo.Image.ImageID

return parentInfo.Image.ImageName, nil
}

Expand Down Expand Up @@ -1588,6 +1635,11 @@ func (ri *rbdImage) getImageInfo() error {
// TODO: can rv.VolSize not be a uint64? Or initialize it to -1?
ri.VolSize = int64(imageInfo.Size)

ri.ImageID, err = image.GetId()
if err != nil {
return err
}

features, err := image.GetFeatures()
if err != nil {
return err
Expand All @@ -1601,11 +1653,13 @@ func (ri *rbdImage) getImageInfo() error {
// the parent is an error or not.
if errors.Is(err, librbd.ErrNotFound) {
ri.ParentName = ""
ri.ParentID = ""
} else {
return err
}
} else {
ri.ParentName = parentInfo.Image.ImageName
ri.ParentID = parentInfo.Image.ImageID
ri.ParentPool = parentInfo.Image.PoolName
}
// Get image creation time
Expand Down Expand Up @@ -1636,6 +1690,7 @@ func (ri *rbdImage) getParent() (*rbdImage, error) {
parentImage.Pool = ri.ParentPool
parentImage.RadosNamespace = ri.RadosNamespace
parentImage.RbdImageName = ri.ParentName
parentImage.ImageID = ri.ParentID

err = parentImage.getImageInfo()
if err != nil {
Expand Down Expand Up @@ -1692,6 +1747,7 @@ type rbdImageMetadataStash struct {
Pool string `json:"pool"`
RadosNamespace string `json:"radosNamespace"`
ImageName string `json:"image"`
ImageID string `json:"imageID"`
UnmapOptions string `json:"unmapOptions"`
NbdAccess bool `json:"accessType"`
Encrypted bool `json:"encrypted"`
Expand Down Expand Up @@ -1721,6 +1777,7 @@ func stashRBDImageMetadata(volOptions *rbdVolume, metaDataPath string) error {
Pool: volOptions.Pool,
RadosNamespace: volOptions.RadosNamespace,
ImageName: volOptions.RbdImageName,
ImageID: volOptions.ImageID,
Encrypted: volOptions.isBlockEncrypted(),
UnmapOptions: volOptions.UnmapOptions,
}
Expand Down
12 changes: 11 additions & 1 deletion internal/rbd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,17 @@ func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume {
vol.JournalPool = rbdSnap.JournalPool
vol.RadosNamespace = rbdSnap.RadosNamespace
vol.RbdImageName = rbdSnap.RbdSnapName
vol.ImageID = rbdSnap.ImageID
vol.ParentName = rbdSnap.ParentName
vol.ParentID = rbdSnap.ParentID

// /!\ WARNING /!\
//
// Do not set the ImageID to the ID of the snapshot, a new image will
// be created based on the returned rbdVolume. If the ImageID is set to
// the ID of the snapshot, accessing the new image by ID will actually
// access the snapshot!
// vol.ImageID = rbdSnap.ImageID

// copyEncryptionConfig cannot be used here because the volume and the
// snapshot will have the same volumeID which cases the panic in
// copyEncryptionConfig function.
Expand Down