Skip to content

Commit

Permalink
rbd: use go-ceph API for creating RBD images
Browse files Browse the repository at this point in the history
This is the initial step for improving performance during provisioning
of CSI volumes backed by RBD.

While creating a volume, an existing connection to the Ceph cluster is
used from the ConnPool. This should speed up the creation of a batch of
volumes significantly.

Updates: ceph#449
Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Mar 9, 2020
1 parent 80ec4cd commit 0088c94
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 12 deletions.
6 changes: 6 additions & 0 deletions pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, err
}

// TODO: create/get a connection from the the ConnPool, and do not pass
// the credentials to any of the utility functions.
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -135,6 +137,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, err
}
defer rbdVol.Destroy()

// Existence and conflict checks
if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
Expand Down Expand Up @@ -294,6 +297,7 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del
defer cs.VolumeLocks.Release(volumeID)

rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.V(3).Infof(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err)
Expand Down Expand Up @@ -352,6 +356,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
defer cs.VolumeLocks.Release(volumeID)

rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
if err = genVolFromVolID(ctx, rbdVol, volumeID, cr, req.GetSecrets()); err != nil {
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err)
Expand Down Expand Up @@ -742,6 +747,7 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
defer cr.DeleteCredentials()

rbdVol := &rbdVolume{}
defer rbdVol.Destroy()
err = genVolFromVolID(ctx, rbdVol, volID, cr, req.GetSecrets())
if err != nil {
if _, ok := err.(ErrImageNotFound); ok {
Expand Down
100 changes: 88 additions & 12 deletions pkg/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

"github.com/ceph/ceph-csi/pkg/util"

"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/pborman/uuid"
Expand All @@ -40,7 +42,7 @@ import (

const (
imageWatcherStr = "watcher="
rbdImageFormat2 = "2"
rbdImageFormat2 = 2
// The following three values are used for 30 seconds timeout
// while waiting for RBD Watcher to expire.
rbdImageWatcherInitDelay = 1 * time.Second
Expand Down Expand Up @@ -88,6 +90,9 @@ type rbdVolume struct {
DisableInUseChecks bool `json:"disableInUseChecks"`
Encrypted bool
KMS util.EncryptionKMS

// connection
conn *rados.Conn
}

// rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics
Expand All @@ -112,36 +117,92 @@ type rbdSnapshot struct {

var (
supportedFeatures = sets.NewString("layering")

// large interval and timeout, it should be longer than the maximum
// time an operation can take (until refcounting of the connections is
// available)
cpInterval = 15 * time.Minute
cpExpiry = 10 * time.Minute
connPool = util.NewConnPool(cpInterval, cpExpiry)
)

// createImage creates a new ceph image with provision and volume options.
func createImage(ctx context.Context, pOpts *rbdVolume, volSz int64, cr *util.Credentials) error {
var output []byte

image := pOpts.RbdImageName
volSzMiB := fmt.Sprintf("%dM", volSz)
options := librbd.NewRbdImageOptions()

var err error
imageFormat := rbdImageFormat2
if pOpts.ImageFormat != "" {
imageFormat, err = strconv.Atoi(pOpts.ImageFormat)
if err != nil {
return errors.Wrapf(err, "failed to convert ImageFormat (%v) to integer", pOpts.ImageFormat)
}
}

logMsg := "rbd: create %s size %s format 2 (features: %s) using mon %s, pool %s "
err = options.SetUint64(librbd.RbdImageOptionFormat, uint64(imageFormat))
if err != nil {
return errors.Wrapf(err, "failed to set ImageFormat to %v", imageFormat)
}

logMsg := "rbd: create %s size %s format %d (features: %s) using mon %s, pool %s "
if pOpts.DataPool != "" {
logMsg += fmt.Sprintf("data pool %s", pOpts.DataPool)
err = options.SetString(librbd.RbdImageOptionDataPool, pOpts.DataPool)
if err != nil {
return errors.Wrapf(err, "failed to set data pool")
}
}
klog.V(4).Infof(util.Log(ctx, logMsg),
image, volSzMiB, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool)
pOpts.RbdImageName, volSzMiB, imageFormat, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool)

args := []string{"create", image, "--size", volSzMiB, "--pool", pOpts.Pool, "--id", cr.ID, "-m", pOpts.Monitors, "--keyfile=" + cr.KeyFile, "--image-feature", pOpts.ImageFeatures}
if pOpts.ImageFeatures != "" {
features := imageFeaturesToUint64(ctx, pOpts.ImageFeatures)
err := options.SetUint64(librbd.RbdImageOptionFeatures, features)
if err != nil {
return errors.Wrapf(err, "failed to set image features")
}
}

if pOpts.DataPool != "" {
args = append(args, "--data-pool", pOpts.DataPool)
ioctx, err := pOpts.getIoctx(cr)
if err != nil {
return errors.Wrapf(err, "failed to get IOContext")
}
output, err := execCommand("rbd", args)
defer ioctx.Destroy()

err = librbd.CreateImage(ioctx, pOpts.RbdImageName, uint64(volSz*util.MiB), options)
if err != nil {
return errors.Wrapf(err, "failed to create rbd image, command output: %s", string(output))
return errors.Wrapf(err, "failed to create rbd image")
}

return nil
}

func (rv *rbdVolume) getIoctx(cr *util.Credentials) (*rados.IOContext, error) {
if rv.conn == nil {
conn, err := connPool.Get(rv.Pool, rv.Monitors, cr.KeyFile)
if err != nil {
return nil, errors.Wrapf(err, "failed to get connection")
}

rv.conn = conn
}

ioctx, err := rv.conn.OpenIOContext(rv.Pool)
if err != nil {
connPool.Put(rv.conn)
return nil, errors.Wrapf(err, "failed to open IOContext for pool %s", rv.Pool)
}

return ioctx, nil
}

func (rv *rbdVolume) Destroy() {
if rv.conn != nil {
connPool.Put(rv.conn)
}
}

// rbdStatus checks if there is watcher on the image.
// It returns true if there is a watcher on the image, otherwise returns false.
func rbdStatus(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) (bool, string, error) {
Expand Down Expand Up @@ -270,7 +331,7 @@ func updateVolWithImageInfo(ctx context.Context, rbdVol *rbdVolume, cr *util.Cre
return fmt.Errorf("unknown or unsupported image format (%d) returned for image (%s)",
imageInfo.Format, rbdVol.RbdImageName)
}
rbdVol.ImageFormat = rbdImageFormat2
rbdVol.ImageFormat = strconv.Itoa(rbdImageFormat2)

rbdVol.VolSize = imageInfo.Size
rbdVol.ImageFeatures = strings.Join(imageInfo.Features, ",")
Expand Down Expand Up @@ -558,6 +619,21 @@ func hasSnapshotFeature(imageFeatures string) bool {
return false
}

// imageFeaturesToUint64 takes the comma separated image features and converts
// them to a RbdImageOptionFeatures value.
func imageFeaturesToUint64(ctx context.Context, imageFeatures string) uint64 {
features := uint64(0)

for _, f := range strings.Split(imageFeatures, ",") {
if f == "layering" {
features |= librbd.RbdFeatureLayering
} else {
klog.Warningf(util.Log(ctx, "rbd: image feature %s not recognized, skipping"), f)
}
}
return features
}

func protectSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credentials) error {
var output []byte

Expand Down

0 comments on commit 0088c94

Please sign in to comment.