Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GET_VOLUME_STATS node capability added #110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pkg/driver/mocks/mock_stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 69 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
)

Expand All @@ -66,6 +67,7 @@ type nodeService struct {
driverOptions *Options
pvmInstanceId string
volumeLocks *util.VolumeLocks
stats StatsUtils
Madhan-SWE marked this conversation as resolved.
Show resolved Hide resolved
}

// newNodeService creates a new node service
Expand All @@ -88,6 +90,7 @@ func newNodeService(driverOptions *Options) nodeService {
driverOptions: driverOptions,
pvmInstanceId: metadata.GetPvmInstanceId(),
volumeLocks: util.NewVolumeLocks(),
stats: &VolumeStatUtils{},
}
}

Expand Down Expand Up @@ -380,7 +383,72 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet")
klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req)
var resp *csi.NodeGetVolumeStatsResponse

if req == nil || req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}

if req.VolumePath == "" {
return nil, status.Error(codes.InvalidArgument, "VolumePath not provided")
}

volumePath := req.VolumePath
// return if path does not exist
if d.stats.IsPathNotExist(volumePath) {
return nil, status.Error(codes.NotFound, "VolumePath not exist")
}

// check if volume mode is raw volume mode
isBlock, err := d.stats.IsBlockDevice(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to check volume %s is block device or not: %v", req.VolumeId, err))
}
// if block device, get deviceStats
if isBlock {
capacity, err := d.stats.DeviceInfo(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to collect block device info: %v", err))
}

resp = &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Total: capacity,
Unit: csi.VolumeUsage_BYTES,
},
},
}

klog.V(4).Infof("Block Device Volume stats collected: %+v\n", resp)
return resp, nil
}

// else get the file system stats
available, capacity, usage, inodes, inodesFree, inodesUsed, err := d.stats.FSInfo(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to collect FSInfo: %v", err))
}
resp = &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: available,
Total: capacity,
Used: usage,
Unit: csi.VolumeUsage_BYTES,
},
{
Available: inodesFree,
Total: inodes,
Used: inodesUsed,
Unit: csi.VolumeUsage_INODES,
},
},
}

klog.V(4).Infof("FS Volume stats collected: %+v\n", resp)
return resp, nil
}

func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Expand Down
112 changes: 112 additions & 0 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package driver
import (
"context"
"errors"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -540,6 +541,117 @@ func TestNodeGetInfo(t *testing.T) {
}
}

func TestNodeGetVolumeStats(t *testing.T) {
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{name: "success block device volume",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
var mockCapacity int64 = 100
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(true, nil)
mockStatUtils.EXPECT().DeviceInfo(volumePath).Return(mockCapacity, nil)
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

resp, err := driver.NodeGetVolumeStats(context.TODO(), &req)
if err != nil {
t.Fatalf("Expect no error but got: %v", err)
}

if resp.Usage[0].Total != mockCapacity {
t.Fatalf("Expected total capacity as %d, got %d", mockCapacity, resp.Usage[0].Total)
}
},
}, {
name: "failure path not exist",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(true)
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
expectErr(t, err, codes.NotFound)
},
}, {
name: "failure checking for block device",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(false, errors.New("Error checking for block device"))
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
expectErr(t, err, codes.Internal)
},
}, {
name: "failure collecting block device info",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(true, nil)
mockStatUtils.EXPECT().DeviceInfo(volumePath).Return(int64(0), errors.New("Error collecting block device info"))

driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
fmt.Println(err)
expectErr(t, err, codes.Internal)
},
},
{
name: "failure collecting fs info",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(false, nil)
var statUnit int64 = 0
mockStatUtils.EXPECT().FSInfo(volumePath).Return(statUnit, statUnit, statUnit, statUnit, statUnit, statUnit, errors.New("Error collecting FS Info"))
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
fmt.Println(err)

expectErr(t, err, codes.Internal)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
}
}

func expectErr(t *testing.T, actualErr error, expectedCode codes.Code) {
if actualErr == nil {
t.Fatalf("Expect error but got no error")
Expand Down
67 changes: 67 additions & 0 deletions pkg/driver/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package driver

import (
"fmt"
"os"
"os/exec"
"strconv"
"strings"

"golang.org/x/sys/unix"
"k8s.io/kubernetes/pkg/volume/util/fs"
)

// StatsUtils ...
type StatsUtils interface {
FSInfo(path string) (int64, int64, int64, int64, int64, int64, error)
IsBlockDevice(devicePath string) (bool, error)
DeviceInfo(devicePath string) (int64, error)
IsPathNotExist(path string) bool
}

// VolumeStatUtils ...
type VolumeStatUtils struct {
}

// IsDevicePathNotExist ...
func (su *VolumeStatUtils) IsPathNotExist(path string) bool {
var stat unix.Stat_t
err := unix.Stat(path, &stat)
if err != nil {
if os.IsNotExist(err) {
return true
}
}
return false
}

// IsBlockDevice ...
func (su *VolumeStatUtils) IsBlockDevice(devicePath string) (bool, error) {
var stat unix.Stat_t
err := unix.Stat(devicePath, &stat)
if err != nil {
return false, err
}

return (stat.Mode & unix.S_IFMT) == unix.S_IFBLK, nil
}

// DeviceInfo ...
func (su *VolumeStatUtils) DeviceInfo(devicePath string) (int64, error) {
output, err := exec.Command("blockdev", "--getsize64", devicePath).CombinedOutput()
if err != nil {
return 0, fmt.Errorf("failed to get size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err)
}
strOut := strings.TrimSpace(string(output))
gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse size '%s' into int", strOut)
}

return gotSizeBytes, nil
}

//FSInfo ...
func (su *VolumeStatUtils) FSInfo(path string) (int64, int64, int64, int64, int64, int64, error) {
return fs.Info(path)
}