diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index de90ed5482a4..a9b73d7d9c2c 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -125,6 +125,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, err } + // TODO: create/get a connection from the the ConnPool, and do not pass + // the credentials to any of the utility functions. cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -135,6 +137,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol if err != nil { return nil, err } + defer rbdVol.Destroy() // Existence and conflict checks if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired { @@ -294,6 +297,7 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del defer cs.VolumeLocks.Release(volumeID) rbdVol := &rbdVolume{} + defer rbdVol.Destroy() if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if err, ok := err.(*util.CacheEntryNotFound); ok { klog.V(3).Infof(util.Log(ctx, "metadata for legacy volume %s not found, assuming the volume to be already deleted (%v)"), volumeID, err) @@ -352,6 +356,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol defer cs.VolumeLocks.Release(volumeID) rbdVol := &rbdVolume{} + defer rbdVol.Destroy() if err = genVolFromVolID(ctx, rbdVol, volumeID, cr, req.GetSecrets()); err != nil { if _, ok := err.(util.ErrPoolNotFound); ok { klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err) @@ -742,6 +747,7 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi defer cr.DeleteCredentials() rbdVol := &rbdVolume{} + defer rbdVol.Destroy() err = genVolFromVolID(ctx, rbdVol, volID, cr, req.GetSecrets()) if err != nil { if _, ok := err.(ErrImageNotFound); ok { diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index 0bc1e76df454..f613c217df93 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -30,6 +30,8 @@ import ( "github.com/ceph/ceph-csi/pkg/util" + "github.com/ceph/go-ceph/rados" + librbd "github.com/ceph/go-ceph/rbd" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" "github.com/pborman/uuid" @@ -40,7 +42,7 @@ import ( const ( imageWatcherStr = "watcher=" - rbdImageFormat2 = "2" + rbdImageFormat2 = 2 // The following three values are used for 30 seconds timeout // while waiting for RBD Watcher to expire. rbdImageWatcherInitDelay = 1 * time.Second @@ -88,6 +90,9 @@ type rbdVolume struct { DisableInUseChecks bool `json:"disableInUseChecks"` Encrypted bool KMS util.EncryptionKMS + + // connection + conn *rados.Conn } // rbdSnapshot represents a CSI snapshot and its RBD snapshot specifics @@ -112,36 +117,92 @@ type rbdSnapshot struct { var ( supportedFeatures = sets.NewString("layering") + + // large interval and timeout, it should be longer than the maximum + // time an operation can take (until refcounting of the connections is + // available) + cpInterval = 15 * time.Minute + cpExpiry = 10 * time.Minute + connPool = util.NewConnPool(cpInterval, cpExpiry) ) // createImage creates a new ceph image with provision and volume options. func createImage(ctx context.Context, pOpts *rbdVolume, volSz int64, cr *util.Credentials) error { - var output []byte - - image := pOpts.RbdImageName volSzMiB := fmt.Sprintf("%dM", volSz) + options := librbd.NewRbdImageOptions() + + var err error + imageFormat := rbdImageFormat2 + if pOpts.ImageFormat != "" { + imageFormat, err = strconv.Atoi(pOpts.ImageFormat) + if err != nil { + return errors.Wrapf(err, "failed to convert ImageFormat (%v) to integer", pOpts.ImageFormat) + } + } - logMsg := "rbd: create %s size %s format 2 (features: %s) using mon %s, pool %s " + err = options.SetUint64(librbd.RbdImageOptionFormat, uint64(imageFormat)) + if err != nil { + return errors.Wrapf(err, "failed to set ImageFormat to %v", imageFormat) + } + + logMsg := "rbd: create %s size %s format %d (features: %s) using mon %s, pool %s " if pOpts.DataPool != "" { logMsg += fmt.Sprintf("data pool %s", pOpts.DataPool) + err = options.SetString(librbd.RbdImageOptionDataPool, pOpts.DataPool) + if err != nil { + return errors.Wrapf(err, "failed to set data pool") + } } klog.V(4).Infof(util.Log(ctx, logMsg), - image, volSzMiB, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool) + pOpts.RbdImageName, volSzMiB, imageFormat, pOpts.ImageFeatures, pOpts.Monitors, pOpts.Pool) - args := []string{"create", image, "--size", volSzMiB, "--pool", pOpts.Pool, "--id", cr.ID, "-m", pOpts.Monitors, "--keyfile=" + cr.KeyFile, "--image-feature", pOpts.ImageFeatures} + if pOpts.ImageFeatures != "" { + features := imageFeaturesToUint64(ctx, pOpts.ImageFeatures) + err := options.SetUint64(librbd.RbdImageOptionFeatures, features) + if err != nil { + return errors.Wrapf(err, "failed to set image features") + } + } - if pOpts.DataPool != "" { - args = append(args, "--data-pool", pOpts.DataPool) + ioctx, err := pOpts.getIoctx(cr) + if err != nil { + return errors.Wrapf(err, "failed to get IOContext") } - output, err := execCommand("rbd", args) + defer ioctx.Destroy() + err = librbd.CreateImage(ioctx, pOpts.RbdImageName, uint64(volSz*util.MiB), options) if err != nil { - return errors.Wrapf(err, "failed to create rbd image, command output: %s", string(output)) + return errors.Wrapf(err, "failed to create rbd image") } return nil } +func (rv *rbdVolume) getIoctx(cr *util.Credentials) (*rados.IOContext, error) { + if rv.conn == nil { + conn, err := connPool.Get(rv.Pool, rv.Monitors, cr.KeyFile) + if err != nil { + return nil, errors.Wrapf(err, "failed to get connection") + } + + rv.conn = conn + } + + ioctx, err := rv.conn.OpenIOContext(rv.Pool) + if err != nil { + connPool.Put(rv.conn) + return nil, errors.Wrapf(err, "failed to open IOContext for pool %s", rv.Pool) + } + + return ioctx, nil +} + +func (rv *rbdVolume) Destroy() { + if rv.conn != nil { + connPool.Put(rv.conn) + } +} + // rbdStatus checks if there is watcher on the image. // It returns true if there is a watcher on the image, otherwise returns false. func rbdStatus(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) (bool, string, error) { @@ -270,7 +331,7 @@ func updateVolWithImageInfo(ctx context.Context, rbdVol *rbdVolume, cr *util.Cre return fmt.Errorf("unknown or unsupported image format (%d) returned for image (%s)", imageInfo.Format, rbdVol.RbdImageName) } - rbdVol.ImageFormat = rbdImageFormat2 + rbdVol.ImageFormat = strconv.Itoa(rbdImageFormat2) rbdVol.VolSize = imageInfo.Size rbdVol.ImageFeatures = strings.Join(imageInfo.Features, ",") @@ -558,6 +619,21 @@ func hasSnapshotFeature(imageFeatures string) bool { return false } +// imageFeaturesToUint64 takes the comma separated image features and converts +// them to a RbdImageOptionFeatures value. +func imageFeaturesToUint64(ctx context.Context, imageFeatures string) uint64 { + features := uint64(0) + + for _, f := range strings.Split(imageFeatures, ",") { + if f == "layering" { + features |= librbd.RbdFeatureLayering + } else { + klog.Warningf(util.Log(ctx, "rbd: image feature %s not recognized, skipping"), f) + } + } + return features +} + func protectSnapshot(ctx context.Context, pOpts *rbdSnapshot, cr *util.Credentials) error { var output []byte