From 4cc244780638578505c178d030e36f7765cc51e4 Mon Sep 17 00:00:00 2001 From: andyzhangx Date: Mon, 9 Mar 2020 06:27:56 +0000 Subject: [PATCH] feat: use file metadata for vhd disk lock fix test failure --- go.mod | 1 + pkg/azurefile/azurefile.go | 2 + pkg/azurefile/controllerserver.go | 106 ++++++++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 4c19ea940f..9f3559e4b5 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 k8s.io/client-go v0.0.0 + k8s.io/cloud-provider v0.0.0 k8s.io/klog v1.0.0 k8s.io/kubernetes v1.15.0 k8s.io/legacy-cloud-providers v0.0.0 diff --git a/pkg/azurefile/azurefile.go b/pkg/azurefile/azurefile.go index cc0ee1fddd..a55ff180dd 100644 --- a/pkg/azurefile/azurefile.go +++ b/pkg/azurefile/azurefile.go @@ -70,6 +70,7 @@ const ( fsTypeField = "fstype" proxyMount = "proxy-mount" cifs = "cifs" + metaDataNode = "node" ) // Driver implements all interfaces of CSI drivers @@ -112,6 +113,7 @@ func (d *Driver) Run(endpoint string) { d.AddControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, //csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, diff --git a/pkg/azurefile/controllerserver.go b/pkg/azurefile/controllerserver.go index 356c735efa..c6eb7e207b 100644 --- a/pkg/azurefile/controllerserver.go +++ b/pkg/azurefile/controllerserver.go @@ -31,6 +31,8 @@ import ( "github.com/pborman/uuid" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/types" + "k8s.io/cloud-provider" "k8s.io/klog" ) @@ -214,15 +216,111 @@ func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) ( } // ControllerPublishVolume make a volume available on some required node -// N/A for azure file func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + klog.V(2).Infof("ControllerPublishVolume: called with args %+v", *req) + volumeID := req.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + } + + nodeID := req.GetNodeId() + if len(nodeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Node ID not provided") + } + nodeName := types.NodeName(nodeID) + if _, err := d.cloud.InstanceID(ctx, nodeName); err != nil { + if err == cloudprovider.InstanceNotFound { + return nil, status.Error(codes.NotFound, fmt.Sprintf("failed to get azure instance id for node %q (%v)", nodeName, err)) + } + return nil, status.Error(codes.Internal, fmt.Sprintf("get azure instance id for node %q failed with %v", nodeName, err)) + } + + resourceGroupName, accountName, fileShareName, diskName, err := getFileShareInfo(volumeID) + if err != nil { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("getFileShareInfo(%s) failed with error: %v", volumeID, err)) + } + if diskName == "" { + klog.V(2).Infof("skip ControllerPublishVolume process since disk name is empty, volumeid: %s", volumeID) + return &csi.ControllerPublishVolumeResponse{}, nil + } + + if resourceGroupName == "" { + resourceGroupName = d.cloud.ResourceGroup + } + accountKey, err := d.cloud.GetStorageAccesskey(accountName, resourceGroupName) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("could not find key for storage account(%s) under resource group(%s), err %v", accountName, resourceGroupName, err)) + } + + storageEndpointSuffix := d.cloud.Environment.StorageEndpointSuffix + fileURL, err := getFileURL(accountName, accountKey, storageEndpointSuffix, fileShareName, diskName) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) returned with error: %v", accountName, storageEndpointSuffix, fileShareName, diskName, err)) + } + if fileURL == nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) return empty fileURL", accountName, storageEndpointSuffix, fileShareName, diskName)) + } + + properties, err := fileURL.GetProperties(ctx) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("GetProperties for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err)) + } + + if v, ok := properties.NewMetadata()[metaDataNode]; ok { + if v != "" { + return nil, status.Error(codes.Internal, fmt.Sprintf("volumeid(%s) cannot be attached to node(%s) since it's already attached to node(%s)", volumeID, nodeID, v)) + } + } + + if _, err = fileURL.SetMetadata(ctx, azfile.Metadata{metaDataNode: nodeID}); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("SetMetadata for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err)) + } + return &csi.ControllerPublishVolumeResponse{}, nil } // ControllerUnpublishVolume make the volume unavailable on a specified node -// N/A for azure file func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + klog.V(2).Infof("ControllerUnpublishVolume: called with args %+v", *req) + volumeID := req.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + } + + nodeID := req.GetNodeId() + if len(nodeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Node ID not provided") + } + + resourceGroupName, accountName, fileShareName, diskName, err := getFileShareInfo(volumeID) + if err != nil { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("getFileShareInfo(%s) failed with error: %v", volumeID, err)) + } + if diskName == "" { + klog.V(2).Infof("skip ControllerUnpublishVolume process since disk name is empty, volumeid: %s", volumeID) + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + + if resourceGroupName == "" { + resourceGroupName = d.cloud.ResourceGroup + } + accountKey, err := d.cloud.GetStorageAccesskey(accountName, resourceGroupName) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("could not find key for storage account(%s) under resource group(%s), err %v", accountName, resourceGroupName, err)) + } + + storageEndpointSuffix := d.cloud.Environment.StorageEndpointSuffix + fileURL, err := getFileURL(accountName, accountKey, storageEndpointSuffix, fileShareName, diskName) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) returned with error: %v", accountName, storageEndpointSuffix, fileShareName, diskName, err)) + } + if fileURL == nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("getFileURL(%s,%s,%s,%s) return empty fileURL", accountName, storageEndpointSuffix, fileShareName, diskName)) + } + + if _, err = fileURL.SetMetadata(ctx, azfile.Metadata{metaDataNode: ""}); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("SetMetadata for volumeid(%s) on node(%s) returned with error: %v", volumeID, nodeID, err)) + } + return &csi.ControllerUnpublishVolumeResponse{}, nil } // CreateSnapshot create a snapshot (todo)