Skip to content

Commit

Permalink
feat: use file metadata for vhd disk lock
Browse files Browse the repository at this point in the history
fix test failure
  • Loading branch information
andyzhangx committed Mar 9, 2020
1 parent b7567a0 commit 4cc2447
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v0.0.0
k8s.io/cloud-provider v0.0.0
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.15.0
k8s.io/legacy-cloud-providers v0.0.0
Expand Down
2 changes: 2 additions & 0 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
fsTypeField = "fstype"
proxyMount = "proxy-mount"
cifs = "cifs"
metaDataNode = "node"
)

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -112,6 +113,7 @@ func (d *Driver) Run(endpoint string) {
d.AddControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
//csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
Expand Down
106 changes: 102 additions & 4 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/pborman/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider"
"k8s.io/klog"
)

Expand Down Expand Up @@ -214,15 +216,111 @@ func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (
}

// ControllerPublishVolume make a volume available on some required node
// N/A for azure file
func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.V(2).Infof("ControllerPublishVolume: called with args %+v", *req)
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}

nodeID := req.GetNodeId()
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
}
nodeName := types.NodeName(nodeID)
if _, err := d.cloud.InstanceID(ctx, nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return nil, status.Error(codes.NotFound, fmt.Sprintf("failed to get azure instance id for node %q (%v)", nodeName, err))
}
return nil, status.Error(codes.Internal, fmt.Sprintf("get azure instance id for node %q failed with %v", nodeName, err))
}

resourceGroupName, accountName, fileShareName, diskName, err := getFileShareInfo(volumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("getFileShareInfo(%s) failed with error: %v", volumeID, err))
}
if diskName == "" {
klog.V(2).Infof("skip ControllerPublishVolume process since disk name is empty, volumeid: %s", volumeID)
return &csi.ControllerPublishVolumeResponse{}, nil
}

if resourceGroupName == "" {
resourceGroupName = d.cloud.ResourceGroup
}
accountKey, err := d.cloud.GetStorageAccesskey(accountName, resourceGroupName)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("could not find key for storage account(%s) under resource group(%s), err %v", accountName, resourceGroupName, err))
}

storageEndpointSuffix := d.cloud.Environment.StorageEndpointSuffix
fileURL, err := getFileURL(accountName, accountKey, storageEndpointSuffix, fileShareName, diskName)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) returned with error: %v", accountName, storageEndpointSuffix, fileShareName, diskName, err))
}
if fileURL == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) return empty fileURL", accountName, storageEndpointSuffix, fileShareName, diskName))
}

properties, err := fileURL.GetProperties(ctx)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("GetProperties for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err))
}

if v, ok := properties.NewMetadata()[metaDataNode]; ok {
if v != "" {
return nil, status.Error(codes.Internal, fmt.Sprintf("volumeid(%s) cannot be attached to node(%s) since it's already attached to node(%s)", volumeID, nodeID, v))
}
}

if _, err = fileURL.SetMetadata(ctx, azfile.Metadata{metaDataNode: nodeID}); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("SetMetadata for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err))
}
return &csi.ControllerPublishVolumeResponse{}, nil
}

// ControllerUnpublishVolume make the volume unavailable on a specified node
// N/A for azure file
func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.V(2).Infof("ControllerUnpublishVolume: called with args %+v", *req)
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}

nodeID := req.GetNodeId()
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
}

resourceGroupName, accountName, fileShareName, diskName, err := getFileShareInfo(volumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("getFileShareInfo(%s) failed with error: %v", volumeID, err))
}
if diskName == "" {
klog.V(2).Infof("skip ControllerUnpublishVolume process since disk name is empty, volumeid: %s", volumeID)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

if resourceGroupName == "" {
resourceGroupName = d.cloud.ResourceGroup
}
accountKey, err := d.cloud.GetStorageAccesskey(accountName, resourceGroupName)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("could not find key for storage account(%s) under resource group(%s), err %v", accountName, resourceGroupName, err))
}

storageEndpointSuffix := d.cloud.Environment.StorageEndpointSuffix
fileURL, err := getFileURL(accountName, accountKey, storageEndpointSuffix, fileShareName, diskName)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) returned with error: %v", accountName, storageEndpointSuffix, fileShareName, diskName, err))
}
if fileURL == nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) return empty fileURL", accountName, storageEndpointSuffix, fileShareName, diskName))
}

if _, err = fileURL.SetMetadata(ctx, azfile.Metadata{metaDataNode: ""}); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("SetMetadata for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err))
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// CreateSnapshot create a snapshot (todo)
Expand Down

0 comments on commit 4cc2447

Please sign in to comment.