diff --git a/pkg/driver/node.go b/pkg/driver/node.go index e3a6fc63..d361c8b1 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -37,8 +37,8 @@ var ( nodeCaps = []csi.NodeServiceCapability_RPC_Type{} ) -// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID -const VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress" +// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given rpcKey +const VolumeOperationAlreadyExists = "An operation with the given volume=%q and target=%q is already in progress" type nodeService struct { metadata cloud.MetadataService @@ -120,12 +120,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } - if ok := d.inFlight.Insert(volumeID); !ok { - return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + rpcKey := fmt.Sprintf("%s-%s", volumeID, target) + + if ok := d.inFlight.Insert(rpcKey); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target) } defer func() { - klog.V(4).InfoS("NodePublishVolume: volume operation finished", "volumeId", volumeID) - d.inFlight.Delete(volumeID) + klog.V(4).InfoS("NodePublishVolume: volume operation finished", "rpcKey", rpcKey) + d.inFlight.Delete(rpcKey) }() mountOptions := []string{} @@ -182,12 +184,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "Target path not provided") } - if ok := d.inFlight.Insert(volumeID); !ok { - return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + rpcKey := fmt.Sprintf("%s-%s", volumeID, target) + if ok := d.inFlight.Insert(rpcKey); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target) } defer func() { - klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "volumeId", volumeID) - d.inFlight.Delete(volumeID) + klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "rpcKey", rpcKey) + d.inFlight.Delete(rpcKey) }() // Check if the target is mounted before unmounting diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 3b87733d..e031ee6c 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -40,10 +40,11 @@ var ( func TestNodePublishVolume(t *testing.T) { var ( - dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com" - mountname = "random" - targetPath = "/target/path" - stdVolCap = &csi.VolumeCapability{ + dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com" + mountname = "random" + targetPath = "/target/path" + targetPathAlt = "/target/alt_path" + stdVolCap = &csi.VolumeCapability{ AccessType: &csi.VolumeCapability_Mount{ Mount: &csi.VolumeCapability_MountVolume{}, }, @@ -438,7 +439,7 @@ func TestNodePublishVolume(t *testing.T) { }, }, { - name: "fail another operation in-flight on given volumeId", + name: "fail another operation in-flight on given volumeId-targetPath", testFunc: func(t *testing.T) { mockCtl := gomock.NewController(t) defer mockCtl.Finish() @@ -462,11 +463,56 @@ func TestNodePublishVolume(t *testing.T) { TargetPath: targetPath, } - awsDriver.inFlight.Insert(volumeID) + rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath) + + awsDriver.inFlight.Insert(rpcKey) _, err := awsDriver.NodePublishVolume(context.TODO(), req) expectErr(t, err, codes.Aborted) }, }, + { + name: "success: operation in-flight with different volumeId-targetPath", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := cloudMock.NewMockMetadataService(mockCtl) + mockMounter := driverMocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + source := dnsname + "@tcp:/" + mountname + + ctx := context.Background() + req := &csi.NodePublishVolumeRequest{ + VolumeId: "volumeId", + VolumeContext: map[string]string{ + volumeContextDnsName: dnsname, + volumeContextMountName: mountname, + }, + VolumeCapability: stdVolCap, + TargetPath: targetPath, + } + + rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt) + + awsDriver.inFlight.Insert(rpcKeyAlt) + + mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil) + mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(true, nil) + mockMounter.EXPECT().Mount(gomock.Eq(source), gomock.Eq(targetPath), gomock.Eq("lustre"), gomock.Any()).Return(nil) + _, err := awsDriver.NodePublishVolume(ctx, req) + if err != nil { + t.Fatalf("NodePublishVolume is failed: %v", err) + } + + mockCtl.Finish() + }, + }, } for _, tc := range testCases { @@ -477,7 +523,8 @@ func TestNodePublishVolume(t *testing.T) { func TestNodeUnpublishVolume(t *testing.T) { var ( - targetPath = "/target/path" + targetPath = "/target/path" + targetPathAlt = "/target/alt_path" ) testCases := []struct { @@ -601,7 +648,7 @@ func TestNodeUnpublishVolume(t *testing.T) { }, }, { - name: "fail another operation in-flight on given volumeId", + name: "fail another operation in-flight on given volumeId-targetPath", testFunc: func(t *testing.T) { mockCtl := gomock.NewController(t) defer mockCtl.Finish() @@ -620,11 +667,46 @@ func TestNodeUnpublishVolume(t *testing.T) { TargetPath: targetPath, } - awsDriver.inFlight.Insert(volumeID) + rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath) + + awsDriver.inFlight.Insert(rpcKey) _, err := awsDriver.NodeUnpublishVolume(context.TODO(), req) expectErr(t, err, codes.Aborted) }, }, + { + name: "success: operation in-flight with different volumeId-targetPath", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := cloudMock.NewMockMetadataService(mockCtl) + mockMounter := driverMocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + ctx := context.Background() + req := &csi.NodeUnpublishVolumeRequest{ + VolumeId: "volumeId", + TargetPath: targetPath, + } + + rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt) + awsDriver.inFlight.Insert(rpcKeyAlt) + + mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil) + mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil) + + _, err := awsDriver.NodeUnpublishVolume(ctx, req) + if err != nil { + t.Fatalf("NodeUnpublishVolume is failed: %v", err) + } + }, + }, } for _, tc := range testCases { t.Run(tc.name, tc.testFunc)