Skip to content

Commit

Permalink
avoid sending unncessary delete requests if delete is in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
marwanad committed Jun 3, 2020
1 parent 777150a commit 29943f7
Showing 1 changed file with 74 additions and 29 deletions.
103 changes: 74 additions & 29 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
}

// GetScaleSetVms returns list of nodes for the given scale set.
// Note that the list results is not used directly because their resource ID format
// is not consistent with Get results.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) {
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) {
klog.V(4).Infof("GetScaleSetVms: starts")
ctx, cancel := getContextWithCancel()
defer cancel()
Expand All @@ -337,24 +335,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) {
return nil, err
}

allVMs := make([]string, 0)
for _, vm := range vmList {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("GetScaleSetVms.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

allVMs = append(allVMs, resourceID)
}

return allVMs, nil
return vmList, nil
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down Expand Up @@ -407,6 +388,9 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return err
}

scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()

instanceIDs := []string{}
for _, instance := range instances {
asg, err := scaleSet.manager.GetAsgForInstance(instance)
Expand All @@ -418,6 +402,11 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}

instanceID, err := getLastSegment(instance.Name)
if err != nil {
klog.Errorf("getLastSegment failed with error: %v", err)
Expand All @@ -427,9 +416,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
instanceIDs = append(instanceIDs, instanceID)
}

// nothing to delete
if len(instanceIDs) == 0 {
klog.V(3).Infof("No new instances eligible for deletion, skipping")
return nil
}

requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}

ctx, cancel := getContextWithCancel()
defer cancel()
resourceGroup := scaleSet.manager.config.ResourceGroup
Expand Down Expand Up @@ -667,16 +663,65 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
return nil, err
}

instances := make([]cloudprovider.Instance, len(vms))
for i := range vms {
name := "azure://" + vms[i]
instances[i] = cloudprovider.Instance{Id: name}
}

scaleSet.instanceCache = instances
scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.lastInstanceRefresh = time.Now()
klog.V(4).Infof("Nodes: returns")
return instances, nil
return scaleSet.instanceCache, nil
}

// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider.Instance {
instances := []cloudprovider.Instance{}

for _, vm := range vms {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

instances = append(instances, cloudprovider.Instance{
Id: "azure://" + resourceID,
Status: instanceStatusFromVM(vm),
})
}

return instances
}

func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance, true
}
}
return cloudprovider.Instance{}, false
}

// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus
func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
if vm.ProvisioningState == nil {
return nil
}

status := &cloudprovider.InstanceStatus{}
switch *vm.ProvisioningState {
case string(compute.ProvisioningStateDeleting):
status.State = cloudprovider.InstanceDeleting
case string(compute.ProvisioningStateCreating):
status.State = cloudprovider.InstanceCreating
default:
status.State = cloudprovider.InstanceRunning
}

return status
}

func (scaleSet *ScaleSet) invalidateInstanceCache() {
Expand Down

0 comments on commit 29943f7

Please sign in to comment.