Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid sending extra deletion calls for in-progress deletions #3141

Merged
merged 2 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,7 +68,19 @@ func newTestAzureManager(t *testing.T) *AzureManager {
},
},
},
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{},
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
},
},
},
},
},
},
}

Expand Down
20 changes: 9 additions & 11 deletions cluster-autoscaler/cloudprovider/azure/azure_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (client *VirtualMachineScaleSetsClientMock) List(ctx context.Context, resou
// VirtualMachineScaleSetVMsClientMock mocks for VirtualMachineScaleSetVMsClient.
type VirtualMachineScaleSetVMsClientMock struct {
mock.Mock
mutex sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSetVM
}

// Get gets a VirtualMachineScaleSetVM by VMScaleSetName and instanceID.
Expand All @@ -128,18 +130,14 @@ func (m *VirtualMachineScaleSetVMsClientMock) Get(ctx context.Context, resourceG

// List gets a list of VirtualMachineScaleSetVMs.
func (m *VirtualMachineScaleSetVMsClientMock) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
ID := fakeVirtualMachineScaleSetVMID
instanceID := "0"
vmID := "123E4567-E89B-12D3-A456-426655440000"
properties := compute.VirtualMachineScaleSetVMProperties{
VMID: &vmID,
}
result = append(result, compute.VirtualMachineScaleSetVM{
ID: &ID,
InstanceID: &instanceID,
VirtualMachineScaleSetVMProperties: &properties,
})
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.FakeStore[resourceGroupName]; ok {
for _, v := range m.FakeStore[resourceGroupName] {
result = append(result, v)
}
}
return result, nil
}

Expand Down
103 changes: 74 additions & 29 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
}

// GetScaleSetVms returns list of nodes for the given scale set.
// Note that the list results is not used directly because their resource ID format
// is not consistent with Get results.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) {
klog.V(4).Infof("GetScaleSetVms: starts")
ctx, cancel := getContextWithCancel()
defer cancel()
Expand All @@ -336,24 +334,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
return nil, rerr
}

allVMs := make([]string, 0)
for _, vm := range vmList {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("GetScaleSetVms.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

allVMs = append(allVMs, resourceID)
}

return allVMs, nil
return vmList, nil
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down Expand Up @@ -406,6 +387,9 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return err
}

scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()

instanceIDs := []string{}
for _, instance := range instances {
asg, err := scaleSet.manager.GetAsgForInstance(instance)
Expand All @@ -417,6 +401,11 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}

instanceID, err := getLastSegment(instance.Name)
if err != nil {
klog.Errorf("getLastSegment failed with error: %v", err)
Expand All @@ -426,9 +415,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
instanceIDs = append(instanceIDs, instanceID)
}

// nothing to delete
if len(instanceIDs) == 0 {
klog.V(3).Infof("No new instances eligible for deletion, skipping")
return nil
}

requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}

ctx, cancel := getContextWithCancel()
defer cancel()
resourceGroup := scaleSet.manager.config.ResourceGroup
Expand Down Expand Up @@ -682,16 +678,65 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
return nil, rerr.Error()
}

instances := make([]cloudprovider.Instance, len(vms))
for i := range vms {
name := "azure://" + vms[i]
instances[i] = cloudprovider.Instance{Id: name}
}

scaleSet.instanceCache = instances
scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.lastInstanceRefresh = time.Now()
klog.V(4).Infof("Nodes: returns")
return instances, nil
return scaleSet.instanceCache, nil
}

// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider.Instance {
instances := []cloudprovider.Instance{}

for _, vm := range vms {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

instances = append(instances, cloudprovider.Instance{
Id: "azure://" + resourceID,
Status: instanceStatusFromVM(vm),
})
}

return instances
}

func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance, true
}
}
return cloudprovider.Instance{}, false
}

// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus
func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
if vm.ProvisioningState == nil {
return nil
}

status := &cloudprovider.InstanceStatus{}
switch *vm.ProvisioningState {
case string(compute.ProvisioningStateDeleting):
status.State = cloudprovider.InstanceDeleting
case string(compute.ProvisioningStateCreating):
status.State = cloudprovider.InstanceCreating
default:
status.State = cloudprovider.InstanceRunning
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We default to Running here. There's no action taken by core CA taken on those state except for InstanceCreating cases that also populate the status.Error.

We can look into utilizing instanceView to surface CSE failures or OS provisioning errors and get CA to delete those VMs faster than waiting for maxNodeProvisionTime

}

return status
}

func (scaleSet *ScaleSet) invalidateInstanceCache() {
Expand Down
66 changes: 66 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,72 @@ func TestDeleteNodes(t *testing.T) {
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 1)
}

func TestDeleteNoConflictRequest(t *testing.T) {
vmssName := "test-asg"
var vmssCapacity int64 = 3

manager := newTestAzureManager(t)
vmsClient := &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
ProvisioningState: to.StringPtr("Deleting"),
},
},
},
},
}

scaleSetClient := &VirtualMachineScaleSetsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSet{
"test": {
"test-asg": {
Name: &vmssName,
Sku: &compute.Sku{
Capacity: &vmssCapacity,
},
},
},
},
}

response := autorest.Response{
Response: &http.Response{
Status: "OK",
},
}

scaleSetClient.On("DeleteInstances", mock.Anything, "test-asg", mock.Anything, mock.Anything).Return(response, nil)
manager.azClient.virtualMachineScaleSetsClient = scaleSetClient
manager.azClient.virtualMachineScaleSetVMsClient = vmsClient

resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000})
provider, err := BuildAzureCloudProvider(manager, resourceLimiter)
assert.NoError(t, err)

registered := manager.RegisterAsg(newTestScaleSet(manager, "test-asg"))
assert.True(t, registered)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID,
},
}

scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet)
assert.True(t, ok)

err = scaleSet.DeleteNodes([]*apiv1.Node{node})
// ensure that DeleteInstances isn't called
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 0)
}

func TestId(t *testing.T) {
provider := newTestProvider(t)
registered := provider.azureManager.RegisterAsg(
Expand Down