diff --git a/pkg/nfs/controllerserver.go b/pkg/nfs/controllerserver.go index 0bbbb4235..83e56afc0 100644 --- a/pkg/nfs/controllerserver.go +++ b/pkg/nfs/controllerserver.go @@ -1,6 +1,11 @@ package nfs import ( + "fmt" + "os" + "path/filepath" + "strings" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" "golang.org/x/net/context" @@ -10,14 +15,136 @@ import ( type ControllerServer struct { Driver *nfsDriver + // Working directory for the provisioner to temporarily mount nfs shares at + workingMountDir string +} + +// nfsVolume is an internal representation of a volume +// created by the provisioner. +type nfsVolume struct { + // Volume id + id string + // Address of the NFS server. + // Matches paramServer. + server string + // Base directory of the NFS server to create volumes under + // Matches paramBaseDir. + baseDir string + // Subdirectory of the NFS server to create volumes under + subDir string +} + +// CSI CreateVolume parameters +const ( + // Address of the NFS server + paramServer = "server" + // Base directory of the NFS server to create volumes under. + // The base directory must be a direct child of the root directory. + // The root directory is ommitted from the string, for example: + // "base" instead of "/base" + paramBaseDir = "base-dir" +) + +// CSI Volume attributes +const ( + // Address of the NFS server + attrServer = "server" + // Path to the NFS share on the server + attrShare = "share" +) + +// Ordering of elements in the CSI volume id. +// ID is of the form {server}/{baseDir}/{subDir}. +// TODO: This volume id format limits baseDir and +// subDir to only be one directory deep. +// Adding a new element should always go at the end +// before totalIDElements +const ( + idServer = iota + idBaseDir + idSubDir + totalIDElements // Always last +) + +func NewControllerServer(d *nfsDriver, workingDir string) *ControllerServer { + return &ControllerServer{ + Driver: d, + workingMountDir: workingDir, + } } func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + glog.V(4).Infof("CreateVolume called with request %+v", req) + + // Validate arguments + name := req.GetName() + if len(name) == 0 { + return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided") + } + + if err := cs.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + nfsVol, err := cs.newNFSVolume(name, req.GetParameters()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + // Mount nfs base share so we can create a subdirectory + if err = cs.internalMount(ctx, nfsVol); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error()) + } + defer func() { + if err = cs.internalUnmount(ctx, nfsVol); err != nil { + glog.Warningf("failed to unmount nfs server: %v", err.Error()) + } + }() + + // Create subdirectory under base-dir + // TODO: revisit permissions + internalVolumePath := cs.getInternalVolumePath(nfsVol) + glog.V(4).Infof("Creating subdirectory at %v", internalVolumePath) + if err = os.Mkdir(internalVolumePath, 0755); err != nil { + return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error()) + } + // Remove capacity setting when provisioner 1.4.0 is available with fix for + // https://github.com/kubernetes-csi/external-provisioner/pull/271 + return &csi.CreateVolumeResponse{Volume: cs.nfsVolToCSI(nfsVol, req.GetCapacityRange().GetRequiredBytes())}, nil } func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + glog.V(4).Infof("DeleteVolume called with request %+v", req) + + volumeId := req.GetVolumeId() + if volumeId == "" { + return nil, status.Error(codes.InvalidArgument, "volume id is empty") + } + nfsVol, err := cs.getNfsVolFromId(volumeId) + if err != nil { + // An invalid ID should be treated as doesn't exist + glog.V(5).Infof("failed to get nfs volume for volume id %v deletion: %v", volumeId, err) + return &csi.DeleteVolumeResponse{}, nil + } + + // Mount nfs base share so we can delete the subdirectory + if err = cs.internalMount(ctx, nfsVol); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error()) + } + defer func() { + if err = cs.internalUnmount(ctx, nfsVol); err != nil { + glog.Warningf("failed to unmount nfs server: %v", err.Error()) + } + }() + + // Delete subdirectory under base-dir + internalVolumePath := cs.getInternalVolumePath(nfsVol) + glog.V(4).Infof("Removing subdirectory at %v", internalVolumePath) + if err = os.RemoveAll(internalVolumePath); err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error()) + } + + return &csi.DeleteVolumeResponse{}, nil } func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { @@ -43,8 +170,6 @@ func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacit // ControllerGetCapabilities implements the default GRPC callout. // Default supports all capabilities func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { - glog.V(5).Infof("Using default ControllerGetCapabilities") - return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.Driver.cscap, }, nil @@ -65,3 +190,172 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } + +func (cs *ControllerServer) validateVolumeCapabilities(caps []*csi.VolumeCapability) error { + if len(caps) == 0 { + return fmt.Errorf("volume capabilities must be provided") + } + + for _, c := range caps { + if err := cs.validateVolumeCapability(c); err != nil { + return err + } + } + return nil +} + +func (cs *ControllerServer) validateVolumeCapability(c *csi.VolumeCapability) error { + if c == nil { + return fmt.Errorf("volume capability must be provided") + } + + // Validate access mode + accessMode := c.GetAccessMode() + if accessMode == nil { + return fmt.Errorf("volume capability access mode not set") + } + if !cs.Driver.cap[accessMode.Mode] { + return fmt.Errorf("driver does not support access mode: %v", accessMode.Mode.String()) + } + + // Validate access type + accessType := c.GetAccessType() + if accessType == nil { + return fmt.Errorf("volume capability access type not set") + } + mountType := c.GetMount() + if mountType == nil { + return fmt.Errorf("driver only supports mount access type volume capability") + } + + if mountType.FsType != "" { + // TODO: uncomment once https://github.com/kubernetes-csi/external-provisioner/issues/328 + // is fixed + // return fmt.Errorf("driver does not support fstype %v", mountType.FsType) + } + // TODO: check if we want to whitelist/blacklist certain mount options + return nil +} + +// Mount nfs server at base-dir +func (cs *ControllerServer) internalMount(ctx context.Context, vol *nfsVolume) error { + sharePath := filepath.Join("/" + vol.baseDir) + targetPath := cs.getInternalMountPath(vol) + + glog.V(4).Infof("internally mounting %v:%v at %v", vol.server, sharePath, targetPath) + _, err := cs.Driver.ns.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ + TargetPath: targetPath, + VolumeContext: map[string]string{ + "server": vol.server, + "share": sharePath, + }, + }) + return err +} + +// Unmount nfs server at base-dir +func (cs *ControllerServer) internalUnmount(ctx context.Context, vol *nfsVolume) error { + targetPath := cs.getInternalMountPath(vol) + + // Unmount nfs server at base-dir + glog.V(4).Infof("internally unmounting %v", targetPath) + _, err := cs.Driver.ns.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{ + TargetPath: cs.getInternalMountPath(vol), + }) + return err +} + +// Convert VolumeCreate parameters to an nfsVolume +func (cs *ControllerServer) newNFSVolume(name string, params map[string]string) (*nfsVolume, error) { + var ( + server string + baseDir string + ) + + // Validate parameters (case-insensitive). + // TODO do more strict validation. + for k, v := range params { + switch strings.ToLower(k) { + case paramServer: + server = v + case paramBaseDir: + baseDir = v + default: + return nil, fmt.Errorf("invalid parameter %q", k) + } + } + + // Validate required parameters + if server == "" { + return nil, fmt.Errorf("%v is a required parameter", paramServer) + } + if baseDir == "" { + return nil, fmt.Errorf("%v is a required parameter", paramBaseDir) + } + + vol := &nfsVolume{ + server: server, + baseDir: baseDir, + subDir: name, + } + vol.id = cs.getVolumeIdFromNfsVol(vol) + + return vol, nil +} + +// Get working directory for CreateVolume and DeleteVolume +func (cs *ControllerServer) getInternalMountPath(vol *nfsVolume) string { + return filepath.Join(cs.workingMountDir, vol.subDir) +} + +// Get internal path where the volume is created +// The reason why the internal path is "workingDir/subDir/subDir" is because: +// * the semantic is actually "workingDir/volId/subDir" and volId == subDir. +// * we need a mount directory per volId because you can have multiple +// CreateVolume calls in parallel and they may use the same underlying share. +// Instead of refcounting how many CreateVolume calls are using the same +// share, it's simpler to just do a mount per request. +func (cs *ControllerServer) getInternalVolumePath(vol *nfsVolume) string { + return filepath.Join(cs.getInternalMountPath(vol), vol.subDir) +} + +// Get user-visible share path for the volume +func (cs *ControllerServer) getVolumeSharePath(vol *nfsVolume) string { + return filepath.Join("/", vol.baseDir, vol.subDir) +} + +// Convert into nfsVolume into a csi.Volume +func (cs *ControllerServer) nfsVolToCSI(vol *nfsVolume, reqBytes int64) *csi.Volume { + return &csi.Volume{ + CapacityBytes: reqBytes, + VolumeId: vol.id, + VolumeContext: map[string]string{ + attrServer: vol.server, + attrShare: cs.getVolumeSharePath(vol), + }, + } +} + +// Given a CSI volume id, return a nfsVolume +func (cs *ControllerServer) getNfsVolFromId(id string) (*nfsVolume, error) { + tokens := strings.Split(id, "/") + if len(tokens) != totalIDElements { + return nil, fmt.Errorf("volume id %q unexpected format: got %v tokens", id, len(tokens)) + } + + return &nfsVolume{ + id: id, + server: tokens[idServer], + baseDir: tokens[idBaseDir], + subDir: tokens[idSubDir], + }, nil +} + +// Given a nfsVolume, return a CSI volume id +func (cs *ControllerServer) getVolumeIdFromNfsVol(vol *nfsVolume) string { + idElements := make([]string, totalIDElements) + idElements[idServer] = vol.server + idElements[idBaseDir] = vol.baseDir + idElements[idSubDir] = vol.subDir + return strings.Join(idElements, "/") +} diff --git a/pkg/nfs/controllerserver_test.go b/pkg/nfs/controllerserver_test.go new file mode 100644 index 000000000..c01351ad2 --- /dev/null +++ b/pkg/nfs/controllerserver_test.go @@ -0,0 +1,334 @@ +package nfs + +import ( + "io/ioutil" + "os" + "path/filepath" + "reflect" + "strings" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "k8s.io/utils/mount" +) + +const ( + testServer = "test-server" + testBaseDir = "test-base-dir" + testCSIVolume = "test-csi" + testVolumeId = "test-server/test-base-dir/test-csi" + testShare = "/test-base-dir/test-csi" +) + +func initTestController(t *testing.T) *ControllerServer { + tmpDir, err := ioutil.TempDir(os.TempDir(), "csi-nfs-controller-test") + if err != nil { + t.Fatalf("failed to create tmp testing dir") + } + defer os.RemoveAll(tmpDir) + + mounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}} + driver := NewNFSdriver("", "") + driver.ns = NewNodeServer(driver, mounter) + return NewControllerServer(driver, tmpDir) +} + +func TestCreateVolume(t *testing.T) { + cases := []struct { + name string + req *csi.CreateVolumeRequest + resp *csi.CreateVolumeResponse + expectErr bool + }{ + { + name: "valid defaults", + req: &csi.CreateVolumeRequest{ + Name: testCSIVolume, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + }, + Parameters: map[string]string{ + paramServer: testServer, + paramBaseDir: testBaseDir, + }, + }, + resp: &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: testVolumeId, + VolumeContext: map[string]string{ + attrServer: testServer, + attrShare: testShare, + }, + }, + }, + }, + { + name: "name empty", + req: &csi.CreateVolumeRequest{ + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + }, + Parameters: map[string]string{ + paramServer: testServer, + paramBaseDir: testBaseDir, + }, + }, + expectErr: true, + }, + { + name: "invalid volume capability", + req: &csi.CreateVolumeRequest{ + Name: testCSIVolume, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + }, + Parameters: map[string]string{ + paramServer: testServer, + paramBaseDir: testBaseDir, + }, + }, + expectErr: true, + }, + { + name: "invalid create context", + req: &csi.CreateVolumeRequest{ + Name: testCSIVolume, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + }, + Parameters: map[string]string{ + "unknown-parameter": "foo", + }, + }, + expectErr: true, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + // Setup + cs := initTestController(t) + + // Run + resp, err := cs.CreateVolume(context.TODO(), test.req) + + // Verify + if !test.expectErr && err != nil { + t.Errorf("test %q failed: %v", test.name, err) + } + if test.expectErr && err == nil { + t.Errorf("test %q failed; got success", test.name) + } + if !reflect.DeepEqual(resp, test.resp) { + t.Errorf("test %q failed: got resp %+v, expected %+v", test.name, resp, test.resp) + } + if !test.expectErr { + info, err := os.Stat(filepath.Join(cs.workingMountDir, test.req.Name, test.req.Name)) + if err != nil { + t.Errorf("test %q failed: couldn't find volume subdirectory: %v", test.name, err) + } + if !info.IsDir() { + t.Errorf("test %q failed: subfile not a directory", test.name) + } + } + }) + } +} + +func TestDeleteVolume(t *testing.T) { + cases := []struct { + name string + req *csi.DeleteVolumeRequest + internalMountDir string + expectErr bool + }{ + { + name: "valid", + req: &csi.DeleteVolumeRequest{ + VolumeId: testVolumeId, + }, + internalMountDir: filepath.Join(testCSIVolume, testCSIVolume), + }, + { + name: "invalid id", + req: &csi.DeleteVolumeRequest{ + VolumeId: testVolumeId + "/foo", + }, + }, + { + name: "empty id", + req: &csi.DeleteVolumeRequest{}, + expectErr: true, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + // Setup + var internalMountPath string + cs := initTestController(t) + if test.internalMountDir != "" { + internalMountPath = filepath.Join(cs.workingMountDir, test.internalMountDir) + if err := os.MkdirAll(internalMountPath, 0755); err != nil { + t.Fatalf("test %q failed: failed to setup volume: %v", test.name, err) + } + } + + // Run + _, err := cs.DeleteVolume(context.TODO(), test.req) + + // Verify + if !test.expectErr && err != nil { + t.Errorf("test %q failed: %v", test.name, err) + } + if test.expectErr && err == nil { + t.Errorf("test %q failed; got success", test.name) + } + if !test.expectErr { + _, err := os.Stat(internalMountPath) + if err != nil && !os.IsNotExist(err) { + t.Errorf("test %q failed: couldn't get info on subdirectory: %v", test.name, err) + } else if err == nil { + t.Errorf("test %q failed: subdirectory still exists: %v", test.name, err) + } + } + }) + } +} + +func TestGenerateNewNFSVolume(t *testing.T) { + cases := []struct { + name string + params map[string]string + expectVol *nfsVolume + expectErr bool + }{ + { + name: "required params", + params: map[string]string{ + paramServer: testServer, + paramBaseDir: testBaseDir, + }, + expectVol: &nfsVolume{ + id: testVolumeId, + server: testServer, + baseDir: testBaseDir, + subDir: testCSIVolume, + }, + }, + { + name: "missing required baseDir", + params: map[string]string{ + paramServer: testServer, + }, + expectErr: true, + }, + { + name: "missing required server", + params: map[string]string{ + paramBaseDir: testBaseDir, + }, + expectErr: true, + }, + { + name: "invalid params", + params: map[string]string{ + "foo-param": "bar", + }, + expectErr: true, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + cs := initTestController(t) + vol, err := cs.newNFSVolume(testCSIVolume, test.params) + if !test.expectErr && err != nil { + t.Errorf("test %q failed: %v", test.name, err) + } + if test.expectErr && err == nil { + t.Errorf("test %q failed; got success", test.name) + } + if !reflect.DeepEqual(vol, test.expectVol) { + t.Errorf("test %q failed: got volume %+v, expected %+v", test.name, vol, test.expectVol) + } + }) + } +} + +func TestGetNfsVolFromId(t *testing.T) { + cases := []struct { + name string + id string + expectVol *nfsVolume + expectErr bool + }{ + { + name: "valid id", + id: testVolumeId, + expectVol: &nfsVolume{ + id: testVolumeId, + server: testServer, + baseDir: testBaseDir, + subDir: testCSIVolume, + }, + }, + { + name: "empty id", + id: "", + expectErr: true, + }, + { + name: "not enough elements", + id: strings.Join([]string{testServer, testBaseDir}, "/"), + expectErr: true, + }, + { + name: "too many elements", + id: strings.Join([]string{testServer, testBaseDir, testCSIVolume, "more"}, "/"), + expectErr: true, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + cs := initTestController(t) + vol, err := cs.getNfsVolFromId(test.id) + if !test.expectErr && err != nil { + t.Errorf("test %q failed: %v", test.name, err) + } + if test.expectErr && err == nil { + t.Errorf("test %q failed; got success", test.name) + } + if !reflect.DeepEqual(vol, test.expectVol) { + t.Errorf("test %q failed: got volume %+v, expected %+v", test.name, vol, test.expectVol) + } + }) + } +} diff --git a/pkg/nfs/nfs.go b/pkg/nfs/nfs.go index 728132c17..6763f2cbe 100644 --- a/pkg/nfs/nfs.go +++ b/pkg/nfs/nfs.go @@ -36,7 +36,8 @@ type nfsDriver struct { } const ( - driverName = "nfs.csi.k8s.io" + driverName = "nfs.csi.k8s.io" + internalMountDir = "/provisioner-working-mounts" ) var ( @@ -63,11 +64,8 @@ func NewNFSdriver(nodeID, endpoint string) *nfsDriver { } n.AddVolumeCapabilityAccessModes(vcam) - // NFS plugin does not support ControllerServiceCapability now. - // If support is added, it should set to appropriate - // ControllerServiceCapability RPC types. - n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN}) - + n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME}) return n } @@ -85,7 +83,7 @@ func (n *nfsDriver) Run() { NewDefaultIdentityServer(n), // NFS plugin has not implemented ControllerServer // using default controllerserver. - NewControllerServer(n), + NewControllerServer(n, internalMountDir), n.ns) s.Wait() } diff --git a/pkg/nfs/nodeserver.go b/pkg/nfs/nodeserver.go index 439d3c8bc..b83da4d51 100644 --- a/pkg/nfs/nodeserver.go +++ b/pkg/nfs/nodeserver.go @@ -58,6 +58,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis mo = append(mo, "ro") } + // TODO: This is needed if rpcbind is not available on the node. What are the consequences? + mo = append(mo, "nolock") + s := req.GetVolumeContext()["server"] ep := req.GetVolumeContext()["share"] source := fmt.Sprintf("%s:%s", s, ep) diff --git a/pkg/nfs/utils.go b/pkg/nfs/utils.go index 63e13f8ac..f076e41e1 100644 --- a/pkg/nfs/utils.go +++ b/pkg/nfs/utils.go @@ -2,12 +2,13 @@ package nfs import ( "fmt" + "strings" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "golang.org/x/net/context" "google.golang.org/grpc" - "strings" ) func NewDefaultIdentityServer(d *nfsDriver) *IdentityServer { @@ -16,12 +17,6 @@ func NewDefaultIdentityServer(d *nfsDriver) *IdentityServer { } } -func NewControllerServer(d *nfsDriver) *ControllerServer { - return &ControllerServer{ - Driver: d, - } -} - func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability { return &csi.ControllerServiceCapability{ Type: &csi.ControllerServiceCapability_Rpc{