Skip to content

Commit

Permalink
Merge pull request #110 from dharaneeshvrd/NodeGetVolumeStats
Browse files Browse the repository at this point in the history
GET_VOLUME_STATS node capability added
  • Loading branch information
Power Cloud Robot authored Jan 28, 2022
2 parents 74c35cb + 4ba7312 commit 24a1494
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 1 deletion.
98 changes: 98 additions & 0 deletions pkg/driver/mocks/mock_stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 69 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
)

Expand All @@ -66,6 +67,7 @@ type nodeService struct {
driverOptions *Options
pvmInstanceId string
volumeLocks *util.VolumeLocks
stats StatsUtils
}

// newNodeService creates a new node service
Expand All @@ -88,6 +90,7 @@ func newNodeService(driverOptions *Options) nodeService {
driverOptions: driverOptions,
pvmInstanceId: metadata.GetPvmInstanceId(),
volumeLocks: util.NewVolumeLocks(),
stats: &VolumeStatUtils{},
}
}

Expand Down Expand Up @@ -380,7 +383,72 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet")
klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req)
var resp *csi.NodeGetVolumeStatsResponse

if req == nil || req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}

if req.VolumePath == "" {
return nil, status.Error(codes.InvalidArgument, "VolumePath not provided")
}

volumePath := req.VolumePath
// return if path does not exist
if d.stats.IsPathNotExist(volumePath) {
return nil, status.Error(codes.NotFound, "VolumePath not exist")
}

// check if volume mode is raw volume mode
isBlock, err := d.stats.IsBlockDevice(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to check volume %s is block device or not: %v", req.VolumeId, err))
}
// if block device, get deviceStats
if isBlock {
capacity, err := d.stats.DeviceInfo(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to collect block device info: %v", err))
}

resp = &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Total: capacity,
Unit: csi.VolumeUsage_BYTES,
},
},
}

klog.V(4).Infof("Block Device Volume stats collected: %+v\n", resp)
return resp, nil
}

// else get the file system stats
available, capacity, usage, inodes, inodesFree, inodesUsed, err := d.stats.FSInfo(volumePath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to collect FSInfo: %v", err))
}
resp = &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: available,
Total: capacity,
Used: usage,
Unit: csi.VolumeUsage_BYTES,
},
{
Available: inodesFree,
Total: inodes,
Used: inodesUsed,
Unit: csi.VolumeUsage_INODES,
},
},
}

klog.V(4).Infof("FS Volume stats collected: %+v\n", resp)
return resp, nil
}

func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Expand Down
112 changes: 112 additions & 0 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package driver
import (
"context"
"errors"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -882,6 +883,117 @@ func TestNodeGetInfo(t *testing.T) {
}
}

func TestNodeGetVolumeStats(t *testing.T) {
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{name: "success block device volume",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
var mockCapacity int64 = 100
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(true, nil)
mockStatUtils.EXPECT().DeviceInfo(volumePath).Return(mockCapacity, nil)
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

resp, err := driver.NodeGetVolumeStats(context.TODO(), &req)
if err != nil {
t.Fatalf("Expect no error but got: %v", err)
}

if resp.Usage[0].Total != mockCapacity {
t.Fatalf("Expected total capacity as %d, got %d", mockCapacity, resp.Usage[0].Total)
}
},
}, {
name: "failure path not exist",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(true)
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
expectErr(t, err, codes.NotFound)
},
}, {
name: "failure checking for block device",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(false, errors.New("Error checking for block device"))
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
expectErr(t, err, codes.Internal)
},
}, {
name: "failure collecting block device info",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(true, nil)
mockStatUtils.EXPECT().DeviceInfo(volumePath).Return(int64(0), errors.New("Error collecting block device info"))

driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
fmt.Println(err)
expectErr(t, err, codes.Internal)
},
},
{
name: "failure collecting fs info",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
mockStatUtils := mocks.NewMockStatsUtils(mockCtl)

volumePath := "./test"
mockStatUtils.EXPECT().IsPathNotExist(volumePath).Return(false)
mockStatUtils.EXPECT().IsBlockDevice(volumePath).Return(false, nil)
var statUnit int64 = 0
mockStatUtils.EXPECT().FSInfo(volumePath).Return(statUnit, statUnit, statUnit, statUnit, statUnit, statUnit, errors.New("Error collecting FS Info"))
driver := &nodeService{stats: mockStatUtils}

req := csi.NodeGetVolumeStatsRequest{VolumeId: volumeID, VolumePath: volumePath}

_, err := driver.NodeGetVolumeStats(context.TODO(), &req)
fmt.Println(err)

expectErr(t, err, codes.Internal)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
}
}

func expectErr(t *testing.T, actualErr error, expectedCode codes.Code) {
if actualErr == nil {
t.Fatalf("Expect error but got no error")
Expand Down
67 changes: 67 additions & 0 deletions pkg/driver/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package driver

import (
"fmt"
"os"
"os/exec"
"strconv"
"strings"

"golang.org/x/sys/unix"
"k8s.io/kubernetes/pkg/volume/util/fs"
)

// StatsUtils ...
type StatsUtils interface {
FSInfo(path string) (int64, int64, int64, int64, int64, int64, error)
IsBlockDevice(devicePath string) (bool, error)
DeviceInfo(devicePath string) (int64, error)
IsPathNotExist(path string) bool
}

// VolumeStatUtils ...
type VolumeStatUtils struct {
}

// IsDevicePathNotExist ...
func (su *VolumeStatUtils) IsPathNotExist(path string) bool {
var stat unix.Stat_t
err := unix.Stat(path, &stat)
if err != nil {
if os.IsNotExist(err) {
return true
}
}
return false
}

// IsBlockDevice ...
func (su *VolumeStatUtils) IsBlockDevice(devicePath string) (bool, error) {
var stat unix.Stat_t
err := unix.Stat(devicePath, &stat)
if err != nil {
return false, err
}

return (stat.Mode & unix.S_IFMT) == unix.S_IFBLK, nil
}

// DeviceInfo ...
func (su *VolumeStatUtils) DeviceInfo(devicePath string) (int64, error) {
output, err := exec.Command("blockdev", "--getsize64", devicePath).CombinedOutput()
if err != nil {
return 0, fmt.Errorf("failed to get size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err)
}
strOut := strings.TrimSpace(string(output))
gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse size '%s' into int", strOut)
}

return gotSizeBytes, nil
}

//FSInfo ...
func (su *VolumeStatUtils) FSInfo(path string) (int64, int64, int64, int64, int64, int64, error) {
return fs.Info(path)
}

0 comments on commit 24a1494

Please sign in to comment.