Skip to content

Commit

Permalink
fix: create volume throttling issue
Browse files Browse the repository at this point in the history
fix issue
  • Loading branch information
andyzhangx committed Sep 18, 2020
1 parent 9c555ca commit cac0ae4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 30 deletions.
36 changes: 29 additions & 7 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func IsCorruptedDir(dir string) bool {
return pathErr != nil && mount.IsCorruptedMnt(pathErr)
}

// GetAccountInfo get account info
func (d *Driver) GetAccountInfo(volumeID string, secrets, reqContext map[string]string) (rgName, accountName, accountKey, fileShareName, diskName string, err error) {
if len(secrets) == 0 {
rgName, accountName, fileShareName, diskName, err = GetFileShareInfo(volumeID)
Expand Down Expand Up @@ -471,18 +472,16 @@ func isSupportedProtocol(protocol string) bool {
return false
}

// CreateFileShare creates a file share, using a matching storage account type, account kind, etc.
// storage account will be created if specified account is not found
func (d *Driver) CreateFileShare(accountOptions *azure.AccountOptions, shareOptions *fileclient.ShareOptions, secrets map[string]string) (string, string, error) {
// CreateFileShare creates a file share
func (d *Driver) CreateFileShare(accountOptions *azure.AccountOptions, shareOptions *fileclient.ShareOptions, secrets map[string]string) error {
if len(secrets) > 0 {
accountName, accountKey, err := getStorageAccount(secrets)
if err != nil {
return accountName, accountKey, err
return err
}
err = d.fileClient.CreateFileShare(accountName, accountKey, shareOptions)
return accountName, accountKey, err
return d.fileClient.CreateFileShare(accountName, accountKey, shareOptions)
}
return d.cloud.CreateFileShare(accountOptions, shareOptions)
return d.cloud.FileClient.CreateFileShare(accountOptions.ResourceGroup, accountOptions.Name, shareOptions)
}

// DeleteFileShare deletes a file share using storage account name and key
Expand All @@ -508,3 +507,26 @@ func (d *Driver) ResizeFileShare(resourceGroup, accountName, shareName string, s
}
return d.cloud.ResizeFileShare(resourceGroup, accountName, shareName, sizeGiB)
}

// GetStorageAccesskey get Azure storage account key
func (d *Driver) GetStorageAccesskey(accountOptions *azure.AccountOptions, secrets map[string]string, secretNamespace string) (string, error) {
if len(secrets) > 0 {
_, accountKey, err := getStorageAccount(secrets)
return accountKey, err
}

// read from k8s secret first
if d.cloud.KubeClient != nil {
secretName := fmt.Sprintf(secretNameTemplate, accountOptions.Name)
if secretNamespace == "" {
secretNamespace = defaultSecretNamespace
}
secret, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
klog.V(5).Infof("could not get secret(%v): %v", secretName, err)
} else {
return string(secret.Data[defaultSecretAccountKey][:]), nil
}
}
return d.cloud.GetStorageAccesskey(accountOptions.Name, accountOptions.ResourceGroup)
}
51 changes: 28 additions & 23 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
}

enableHTTPSTrafficOnly := true
shareProtocol := storage.SMB
if fsType == nfs || protocol == nfs {
protocol = nfs
Expand All @@ -122,6 +123,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
storeAccountKey = storeAccountKeyFalse
// reset protocol field (compatable with "fsType: nfs")
parameters[protocolField] = protocol
enableHTTPSTrafficOnly = false
}

fileShareSize := int(requestGiB)
Expand Down Expand Up @@ -151,8 +153,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
resourceGroup = d.cloud.ResourceGroup
}

enableHTTPSTrafficOnly := true

tags, err := azure.ConvertTagsToMap(customTags)
if err != nil {
return nil, err
Expand All @@ -168,15 +168,15 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
Tags: tags,
}

var retAccount, retAccountKey string
if len(req.GetSecrets()) == 0 { // check whether account is provided by secret
lockKey := account + sku + accountKind + resourceGroup + location
var accountName, accountKey string
if len(req.GetSecrets()) == 0 && accountName == "" {
lockKey := sku + accountKind + resourceGroup + location
d.volLockMap.LockEntry(lockKey)
defer d.volLockMap.UnlockEntry(lockKey)

err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
var retErr error
retAccount, retAccountKey, retErr = d.cloud.EnsureStorageAccount(accountOptions, fileShareAccountNamePrefix)
accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(accountOptions, fileShareAccountNamePrefix)
if isRetriableError(retErr) {
klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retring", account, retErr)
return false, nil
Expand All @@ -188,38 +188,38 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

if quota, err := d.getFileShareQuota(resourceGroup, retAccount, validFileShareName, req.GetSecrets()); err != nil {
if quota, err := d.getFileShareQuota(resourceGroup, accountName, validFileShareName, req.GetSecrets()); err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
} else if quota != -1 && quota != fileShareSize {
return nil, status.Errorf(codes.AlreadyExists, "request file share(%s) already exists, but its capacity(%v) is different from (%v)", validFileShareName, quota, fileShareSize)
}

accountOptions.Name = accountName
shareOptions := &fileclient.ShareOptions{
Name: validFileShareName,
Protocol: shareProtocol,
RequestGiB: fileShareSize,
}

klog.V(2).Infof("begin to create file share(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d) protocol(%s)", validFileShareName, retAccount, sku, resourceGroup, location, fileShareSize, shareProtocol)
klog.V(2).Infof("begin to create file share(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d) protocol(%s)", validFileShareName, accountName, sku, resourceGroup, location, fileShareSize, shareProtocol)
err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
var retErr error
retAccount, retAccountKey, retErr = d.CreateFileShare(accountOptions, shareOptions, req.GetSecrets())
if isRetriableError(retErr) {
klog.Warningf("CreateFileShare(%s) on account(%s) failed with error(%v), waiting for retring", validFileShareName, account, retErr)
err := d.CreateFileShare(accountOptions, shareOptions, req.GetSecrets())
if isRetriableError(err) {
klog.Warningf("CreateFileShare(%s) on account(%s) failed with error(%v), waiting for retring", validFileShareName, account, err)
return false, nil
}
return true, retErr
return true, err
})
if err != nil {
return nil, fmt.Errorf("failed to create file share(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, resourceGroup, location, fileShareSize, err)
}
if retAccount == "" || retAccountKey == "" {
return nil, fmt.Errorf("create file share(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d) timeout(3m)", validFileShareName, account, sku, resourceGroup, location, fileShareSize)
}
klog.V(2).Infof("create file share %s on storage account %s successfully", validFileShareName, retAccount)
klog.V(2).Infof("create file share %s on storage account %s successfully", validFileShareName, accountName)

isDiskMount := isDiskFsType(fsType)
if isDiskMount && diskName == "" {
if isDiskFsType(fsType) && diskName == "" {
if accountKey == "" {
if accountKey, err = d.GetStorageAccesskey(accountOptions, req.GetSecrets(), secretNamespace); err != nil {
return nil, fmt.Errorf("failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
}
if fileShareName == "" {
// use pvc name as vhd disk name if file share not specified
diskName = validFileShareName + vhdSuffix
Expand All @@ -230,7 +230,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
diskSizeBytes := volumehelper.GiBToBytes(requestGiB)
klog.V(2).Infof("begin to create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s)",
diskName, diskSizeBytes, validFileShareName, account, sku, resourceGroup, location)
if err := createDisk(ctx, retAccount, retAccountKey, d.cloud.Environment.StorageEndpointSuffix, validFileShareName, diskName, diskSizeBytes); err != nil {
if err := createDisk(ctx, accountName, accountKey, d.cloud.Environment.StorageEndpointSuffix, validFileShareName, diskName, diskSizeBytes); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create VHD disk: %v", err)
}
klog.V(2).Infof("create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s) successfully",
Expand All @@ -239,7 +239,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

if storeAccountKey != storeAccountKeyFalse && len(req.GetSecrets()) == 0 {
secretName, err := setAzureCredentials(d.cloud.KubeClient, retAccount, retAccountKey, secretNamespace)
if accountKey == "" {
if accountKey, err = d.GetStorageAccesskey(accountOptions, req.GetSecrets(), secretNamespace); err != nil {
return nil, fmt.Errorf("failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
}
secretName, err := setAzureCredentials(d.cloud.KubeClient, accountName, accountKey, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
}
Expand All @@ -248,7 +253,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

volumeID := fmt.Sprintf(volumeIDTemplate, resourceGroup, retAccount, validFileShareName, diskName)
volumeID := fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validFileShareName, diskName)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
Expand Down

0 comments on commit cac0ae4

Please sign in to comment.