diff --git a/internal/csi-addons/rbd/reclaimspace.go b/internal/csi-addons/rbd/reclaimspace.go index 97a5ea02523f..1829ae57d1ba 100644 --- a/internal/csi-addons/rbd/reclaimspace.go +++ b/internal/csi-addons/rbd/reclaimspace.go @@ -37,12 +37,14 @@ import ( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceControllerServer struct { *rs.UnimplementedReclaimSpaceControllerServer + // Embed ControllerServer as it implements helper functions + *rbdutil.ControllerServer } // NewReclaimSpaceControllerServer creates a new ReclaimSpaceControllerServer which handles // the ReclaimSpace Service requests from the CSI-Addons specification. -func NewReclaimSpaceControllerServer() *ReclaimSpaceControllerServer { - return &ReclaimSpaceControllerServer{} +func NewReclaimSpaceControllerServer(c *rbdutil.ControllerServer) *ReclaimSpaceControllerServer { + return &ReclaimSpaceControllerServer{ControllerServer: c} } func (rscs *ReclaimSpaceControllerServer) RegisterService(server grpc.ServiceRegistrar) { @@ -64,6 +66,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( } defer cr.DeleteCredentials() + if acquired := rscs.VolumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rscs.VolumeLocks.Release(volumeID) + rbdVol, err := rbdutil.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) if err != nil { return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error()) @@ -90,12 +99,14 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceNodeServer struct { *rs.UnimplementedReclaimSpaceNodeServer + // Embed NodeServer as it implements helper functions + *rbdutil.NodeServer } // NewReclaimSpaceNodeServer creates a new IdentityServer which handles the // Identity Service requests from the CSI-Addons specification. -func NewReclaimSpaceNodeServer() *ReclaimSpaceNodeServer { - return &ReclaimSpaceNodeServer{} +func NewReclaimSpaceNodeServer(n *rbdutil.NodeServer) *ReclaimSpaceNodeServer { + return &ReclaimSpaceNodeServer{NodeServer: n} } func (rsns *ReclaimSpaceNodeServer) RegisterService(server grpc.ServiceRegistrar) { @@ -116,6 +127,13 @@ func (rsns *ReclaimSpaceNodeServer) NodeReclaimSpace( return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } + if acquired := rsns.VolumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rsns.VolumeLocks.Release(volumeID) + // path can either be the staging path on the node, or the volume path // inside an application container path := req.GetStagingTargetPath() diff --git a/internal/csi-addons/rbd/reclaimspace_test.go b/internal/csi-addons/rbd/reclaimspace_test.go index 6effbc0fa0f4..3161d662dfe1 100644 --- a/internal/csi-addons/rbd/reclaimspace_test.go +++ b/internal/csi-addons/rbd/reclaimspace_test.go @@ -22,6 +22,8 @@ import ( rs "github.com/csi-addons/spec/lib/go/reclaimspace" "github.com/stretchr/testify/require" + + rbdutil "github.com/ceph/ceph-csi/internal/rbd" ) // TestControllerReclaimSpace is a minimal test for the @@ -30,7 +32,7 @@ import ( func TestControllerReclaimSpace(t *testing.T) { t.Parallel() - controller := NewReclaimSpaceControllerServer() + controller := NewReclaimSpaceControllerServer(&rbdutil.ControllerServer{}) req := &rs.ControllerReclaimSpaceRequest{ VolumeId: "", @@ -47,7 +49,7 @@ func TestControllerReclaimSpace(t *testing.T) { func TestNodeReclaimSpace(t *testing.T) { t.Parallel() - node := NewReclaimSpaceNodeServer() + node := NewReclaimSpaceNodeServer(&rbdutil.NodeServer{}) req := &rs.NodeReclaimSpaceRequest{ VolumeId: "", diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index c4c736c3447d..743d7366b8e9 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -70,14 +70,14 @@ func NewNodeServer( d *csicommon.CSIDriver, t string, nodeLabels, topology, crushLocationMap map[string]string, -) (*rbd.NodeServer, error) { +) *rbd.NodeServer { cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap) ns := rbd.NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels), VolumeLocks: util.NewVolumeLocks(), } - return &ns, nil + return &ns } // Run start a non-blocking grpc controller,node and identityserver for @@ -144,10 +144,8 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) - if err != nil { - log.FatalLogMsg("failed to start node server, err %v\n", err) - } + r.ns = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) + var attr string attr, err = rbd.GetKrbdSupportedFeatures() if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -213,7 +211,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { r.cas.RegisterService(is) if conf.IsControllerServer { - rs := casrbd.NewReclaimSpaceControllerServer() + rs := casrbd.NewReclaimSpaceControllerServer(&rbd.ControllerServer{ + VolumeLocks: util.NewVolumeLocks(), + }) r.cas.RegisterService(rs) fcs := casrbd.NewFenceControllerServer() @@ -224,7 +224,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { } if conf.IsNodeServer { - rs := casrbd.NewReclaimSpaceNodeServer() + rs := casrbd.NewReclaimSpaceNodeServer(&rbd.NodeServer{ + VolumeLocks: util.NewVolumeLocks(), + }) r.cas.RegisterService(rs) }