Skip to content

Commit

Permalink
Change node-level idempotency to volume-target
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwolfaws committed Nov 6, 2023
1 parent b02afc9 commit 265c7a9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 19 deletions.
23 changes: 13 additions & 10 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{}
)

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
const VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given rpcKey
const VolumeOperationAlreadyExists = "An operation with the given volume=%q and target=%q is already in progress"

type nodeService struct {
metadata cloud.MetadataService
Expand Down Expand Up @@ -120,12 +120,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
rpcKey := fmt.Sprintf("%s-%s", volumeID, target)

if ok := d.inFlight.Insert(rpcKey); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target)
}
defer func() {
klog.V(4).InfoS("NodePublishVolume: volume operation finished", "volumeId", volumeID)
d.inFlight.Delete(volumeID)
klog.V(4).InfoS("NodePublishVolume: volume operation finished", "rpcKey", rpcKey)
d.inFlight.Delete(rpcKey)
}()

mountOptions := []string{}
Expand Down Expand Up @@ -182,12 +184,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
rpcKey := fmt.Sprintf("%s-%s", volumeID, target)
if ok := d.inFlight.Insert(rpcKey); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target)
}
defer func() {
klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "volumeId", volumeID)
d.inFlight.Delete(volumeID)
klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "rpcKey", rpcKey)
d.inFlight.Delete(rpcKey)
}()

// Check if the target is mounted before unmounting
Expand Down
100 changes: 91 additions & 9 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ var (
func TestNodePublishVolume(t *testing.T) {

var (
dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com"
mountname = "random"
targetPath = "/target/path"
stdVolCap = &csi.VolumeCapability{
dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com"
mountname = "random"
targetPath = "/target/path"
targetPathAlt = "/target/alt_path"
stdVolCap = &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
Expand Down Expand Up @@ -438,7 +439,7 @@ func TestNodePublishVolume(t *testing.T) {
},
},
{
name: "fail another operation in-flight on given volumeId",
name: "fail another operation in-flight on given volumeId-targetPath",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
Expand All @@ -462,11 +463,56 @@ func TestNodePublishVolume(t *testing.T) {
TargetPath: targetPath,
}

awsDriver.inFlight.Insert(volumeID)
rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath)

awsDriver.inFlight.Insert(rpcKey)
_, err := awsDriver.NodePublishVolume(context.TODO(), req)
expectErr(t, err, codes.Aborted)
},
},
{
name: "success: operation in-flight with different volumeId-targetPath",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
mockMounter := driverMocks.NewMockMounter(mockCtl)

awsDriver := &nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

source := dnsname + "@tcp:/" + mountname

ctx := context.Background()
req := &csi.NodePublishVolumeRequest{
VolumeId: "volumeId",
VolumeContext: map[string]string{
volumeContextDnsName: dnsname,
volumeContextMountName: mountname,
},
VolumeCapability: stdVolCap,
TargetPath: targetPath,
}

rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt)

awsDriver.inFlight.Insert(rpcKeyAlt)

mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(true, nil)
mockMounter.EXPECT().Mount(gomock.Eq(source), gomock.Eq(targetPath), gomock.Eq("lustre"), gomock.Any()).Return(nil)
_, err := awsDriver.NodePublishVolume(ctx, req)
if err != nil {
t.Fatalf("NodePublishVolume is failed: %v", err)
}

mockCtl.Finish()
},
},
}

for _, tc := range testCases {
Expand All @@ -477,7 +523,8 @@ func TestNodePublishVolume(t *testing.T) {
func TestNodeUnpublishVolume(t *testing.T) {

var (
targetPath = "/target/path"
targetPath = "/target/path"
targetPathAlt = "/target/alt_path"
)

testCases := []struct {
Expand Down Expand Up @@ -601,7 +648,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
},
},
{
name: "fail another operation in-flight on given volumeId",
name: "fail another operation in-flight on given volumeId-targetPath",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
Expand All @@ -620,11 +667,46 @@ func TestNodeUnpublishVolume(t *testing.T) {
TargetPath: targetPath,
}

awsDriver.inFlight.Insert(volumeID)
rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath)

awsDriver.inFlight.Insert(rpcKey)
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
expectErr(t, err, codes.Aborted)
},
},
{
name: "success: operation in-flight with different volumeId-targetPath",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
mockMounter := driverMocks.NewMockMounter(mockCtl)

awsDriver := &nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

ctx := context.Background()
req := &csi.NodeUnpublishVolumeRequest{
VolumeId: "volumeId",
TargetPath: targetPath,
}

rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt)
awsDriver.inFlight.Insert(rpcKeyAlt)

mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)

_, err := awsDriver.NodeUnpublishVolume(ctx, req)
if err != nil {
t.Fatalf("NodeUnpublishVolume is failed: %v", err)
}
},
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
Expand Down

0 comments on commit 265c7a9

Please sign in to comment.