Skip to content

Commit

Permalink
CSI: protobuffer mappings for Create/Delete/List volume RPCs
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored Mar 15, 2021
1 parent eade3e0 commit bbb9699
Show file tree
Hide file tree
Showing 5 changed files with 645 additions and 0 deletions.
88 changes: 88 additions & 0 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type CSIControllerClient interface {
ControllerPublishVolume(ctx context.Context, in *csipbv1.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerPublishVolumeResponse, error)
ControllerUnpublishVolume(ctx context.Context, in *csipbv1.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerUnpublishVolumeResponse, error)
ValidateVolumeCapabilities(ctx context.Context, in *csipbv1.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.ValidateVolumeCapabilitiesResponse, error)
CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error)
ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error)
DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error)
}

// CSINodeClient defines the minimal CSI Node Plugin interface used
Expand Down Expand Up @@ -383,6 +386,91 @@ func (c *client) ControllerValidateCapabilities(ctx context.Context, req *Contro
return nil
}

func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error) {
err := req.Validate()
if err != nil {
return nil, err
}
creq := req.ToCSIRepresentation()
resp, err := c.controllerClient.CreateVolume(ctx, creq, opts...)

// these standard gRPC error codes are overloaded with CSI-specific
// meanings, so translate them into user-understandable terms
// https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume-errors
if err != nil {
code := status.Code(err)
switch code {
case codes.InvalidArgument:
return nil, fmt.Errorf(
"volume %q snapshot source %q is not compatible with these parameters: %v",
req.Name, req.ContentSource, err)
case codes.NotFound:
return nil, fmt.Errorf(
"volume %q content source %q does not exist: %v",
req.Name, req.ContentSource, err)
case codes.AlreadyExists:
return nil, fmt.Errorf(
"volume %q already exists but is incompatible with these parameters: %v",
req.Name, err)
case codes.ResourceExhausted:
return nil, fmt.Errorf(
"unable to provision %q in accessible_topology: %v",
req.Name, err)
case codes.OutOfRange:
return nil, fmt.Errorf(
"unsupported capacity_range for volume %q: %v", req.Name, err)
case codes.Internal:
return nil, fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
return nil, err
}

return NewCreateVolumeResponse(resp), nil
}

func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) {
err := req.Validate()
if err != nil {
return nil, err
}
creq := req.ToCSIRepresentation()
resp, err := c.controllerClient.ListVolumes(ctx, creq, opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.Aborted:
return nil, fmt.Errorf(
"invalid starting token %q: %v", req.StartingToken, err)
case codes.Internal:
return nil, fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
return nil, err
}
return NewListVolumesResponse(resp), nil
}

func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error {
err := req.Validate()
if err != nil {
return err
}
creq := req.ToCSIRepresentation()
_, err = c.controllerClient.DeleteVolume(ctx, creq, opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.FailedPrecondition:
return fmt.Errorf("volume %q is in use: %v", req.ExternalVolumeID, err)
case codes.Internal:
return fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
}
return err
}

// compareCapabilities returns an error if the 'got' capabilities aren't found
// within the 'expected' capability.
//
Expand Down
242 changes: 242 additions & 0 deletions plugins/csi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,249 @@ func TestClient_RPC_ControllerValidateVolume(t *testing.T) {
}
})
}
}

func TestClient_RPC_ControllerCreateVolume(t *testing.T) {

cases := []struct {
Name string
CapacityRange *CapacityRange
ContentSource *VolumeContentSource
ResponseErr error
Response *csipbv1.CreateVolumeResponse
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},

{
Name: "handles error invalid capacity range",
CapacityRange: &CapacityRange{
RequiredBytes: 1000,
LimitBytes: 500,
},
ExpectedErr: errors.New("LimitBytes cannot be less than RequiredBytes"),
},

{
Name: "handles error invalid content source",
ContentSource: &VolumeContentSource{
SnapshotID: "snap-12345",
CloneID: "vol-12345",
},
ExpectedErr: errors.New(
"one of SnapshotID or CloneID must be set if ContentSource is set"),
},

{
Name: "handles success missing source and range",
Response: &csipbv1.CreateVolumeResponse{},
},

{
Name: "handles success with capacity range and source",
CapacityRange: &CapacityRange{
RequiredBytes: 500,
LimitBytes: 1000,
},
ContentSource: &VolumeContentSource{
SnapshotID: "snap-12345",
},
Response: &csipbv1.CreateVolumeResponse{
Volume: &csipbv1.Volume{
CapacityBytes: 1000,
ContentSource: &csipbv1.VolumeContentSource{
Type: &csipbv1.VolumeContentSource_Snapshot{
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{
SnapshotId: "snap-12345",
},
},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()

req := &ControllerCreateVolumeRequest{
Name: "vol-123456",
CapacityRange: tc.CapacityRange,
VolumeCapabilities: []*VolumeCapability{
{
AccessType: VolumeAccessTypeMount,
AccessMode: VolumeAccessModeMultiNodeMultiWriter,
},
},
Parameters: map[string]string{},
Secrets: structs.CSISecrets{},
ContentSource: tc.ContentSource,
AccessibilityRequirements: &TopologyRequirement{},
}

cc.NextCreateVolumeResponse = tc.Response
cc.NextErr = tc.ResponseErr

resp, err := client.ControllerCreateVolume(context.TODO(), req)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
return
}
require.NoError(t, err, tc.Name)
if tc.Response == nil {
require.Nil(t, resp)
return
}
if tc.CapacityRange != nil {
require.Greater(t, resp.Volume.CapacityBytes, int64(0))
}
if tc.ContentSource != nil {
require.Equal(t, tc.ContentSource.CloneID, resp.Volume.ContentSource.CloneID)
require.Equal(t, tc.ContentSource.SnapshotID, resp.Volume.ContentSource.SnapshotID)
}
})
}
}

func TestClient_RPC_ControllerDeleteVolume(t *testing.T) {

cases := []struct {
Name string
Request *ControllerDeleteVolumeRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerDeleteVolumeRequest{ExternalVolumeID: "vol-12345"},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},

{
Name: "handles error missing volume ID",
Request: &ControllerDeleteVolumeRequest{},
ExpectedErr: errors.New("missing ExternalVolumeID"),
},

{
Name: "handles success",
Request: &ControllerDeleteVolumeRequest{ExternalVolumeID: "vol-12345"},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()

cc.NextErr = tc.ResponseErr
err := client.ControllerDeleteVolume(context.TODO(), tc.Request)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
return
}
require.NoError(t, err, tc.Name)
})
}
}

func TestClient_RPC_ControllerListVolume(t *testing.T) {

cases := []struct {
Name string
Request *ControllerListVolumesRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerListVolumesRequest{},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},

{
Name: "handles error invalid max entries",
Request: &ControllerListVolumesRequest{MaxEntries: -1},
ExpectedErr: errors.New("MaxEntries cannot be negative"),
},

{
Name: "handles success",
Request: &ControllerListVolumesRequest{},
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()

cc.NextErr = tc.ResponseErr
if tc.ResponseErr != nil {
// note: there's nothing interesting to assert here other than
// that we don't throw a NPE during transformation from
// protobuf to our struct
cc.NextListVolumesResponse = &csipbv1.ListVolumesResponse{
Entries: []*csipbv1.ListVolumesResponse_Entry{
{
Volume: &csipbv1.Volume{
CapacityBytes: 1000000,
VolumeId: "vol-0",
VolumeContext: map[string]string{"foo": "bar"},

ContentSource: &csipbv1.VolumeContentSource{},
AccessibleTopology: []*csipbv1.Topology{
{
Segments: map[string]string{"rack": "A"},
},
},
},
},

{
Volume: &csipbv1.Volume{
VolumeId: "vol-1",
AccessibleTopology: []*csipbv1.Topology{
{
Segments: map[string]string{"rack": "A"},
},
},
},
},

{
Volume: &csipbv1.Volume{
VolumeId: "vol-3",
ContentSource: &csipbv1.VolumeContentSource{
Type: &csipbv1.VolumeContentSource_Snapshot{
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{
SnapshotId: "snap-12345",
},
},
},
},
},
},
NextToken: "abcdef",
}
}

resp, err := client.ControllerListVolumes(context.TODO(), tc.Request)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
return
}
require.NoError(t, err, tc.Name)
require.NotNil(t, resp)

})
}
}

func TestClient_RPC_NodeStageVolume(t *testing.T) {
Expand Down
32 changes: 32 additions & 0 deletions plugins/csi/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ type Client struct {
NextControllerUnpublishVolumeErr error
ControllerUnpublishVolumeCallCount int64

NextControllerCreateVolumeResponse *csi.ControllerCreateVolumeResponse
NextControllerCreateVolumeErr error
ControllerCreateVolumeCallCount int64

NextControllerDeleteVolumeErr error
ControllerDeleteVolumeCallCount int64

NextControllerListVolumesResponse *csi.ControllerListVolumesResponse
NextControllerListVolumesErr error
ControllerListVolumesCallCount int64

NextControllerValidateVolumeErr error
ControllerValidateVolumeCallCount int64

Expand Down Expand Up @@ -168,6 +179,27 @@ func (c *Client) ControllerValidateCapabilities(ctx context.Context, req *csi.Co
return c.NextControllerValidateVolumeErr
}

func (c *Client) ControllerCreateVolume(ctx context.Context, in *csi.ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerCreateVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerCreateVolumeCallCount++
return c.NextControllerCreateVolumeResponse, c.NextControllerCreateVolumeErr
}

func (c *Client) ControllerDeleteVolume(ctx context.Context, req *csi.ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerDeleteVolumeCallCount++
return c.NextControllerDeleteVolumeErr
}

func (c *Client) ControllerListVolumes(ctx context.Context, req *csi.ControllerListVolumesRequest, opts ...grpc.CallOption) (*csi.ControllerListVolumesResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerListVolumesCallCount++
return c.NextControllerListVolumesResponse, c.NextControllerListVolumesErr
}

func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
Expand Down
Loading

0 comments on commit bbb9699

Please sign in to comment.