Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #3141: Avoid sending extra deletion calls for in-progress deletions #3179

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -79,8 +80,34 @@ func newTestAzureManager(t *testing.T) *AzureManager {
virtualMachinesClient: &VirtualMachinesClientMock{
FakeStore: make(map[string]map[string]compute.VirtualMachine),
},
virtualMachineScaleSetsClient: scaleSetsClient,
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{},
virtualMachineScaleSetsClient: &VirtualMachineScaleSetsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSet{
"test": {
"test-asg": {
Name: &vmssName,
Sku: &compute.Sku{
Capacity: &vmssCapacity,
Name: &skuName,
},
VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{},
Location: &location,
},
},
},
},
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
},
},
},
},
},
},
}

Expand Down
23 changes: 11 additions & 12 deletions cluster-autoscaler/cloudprovider/azure/azure_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage"
"github.com/Azure/go-autorest/autorest"
"github.com/avast/retry-go"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -111,6 +112,8 @@ func (client *VirtualMachineScaleSetsClientMock) List(ctx context.Context, resou
// VirtualMachineScaleSetVMsClientMock mocks for VirtualMachineScaleSetVMsClient.
type VirtualMachineScaleSetVMsClientMock struct {
mock.Mock
mutex sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSetVM
}

// Get gets a VirtualMachineScaleSetVM by VMScaleSetName and instanceID.
Expand All @@ -128,19 +131,15 @@ func (m *VirtualMachineScaleSetVMsClientMock) Get(ctx context.Context, resourceG
}

// List gets a list of VirtualMachineScaleSetVMs.
func (m *VirtualMachineScaleSetVMsClientMock) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, err error) {
ID := fakeVirtualMachineScaleSetVMID
instanceID := "0"
vmID := "123E4567-E89B-12D3-A456-426655440000"
properties := compute.VirtualMachineScaleSetVMProperties{
VMID: &vmID,
}
result = append(result, compute.VirtualMachineScaleSetVM{
ID: &ID,
InstanceID: &instanceID,
VirtualMachineScaleSetVMProperties: &properties,
})
func (m *VirtualMachineScaleSetVMsClientMock) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.FakeStore[resourceGroupName]; ok {
for _, v := range m.FakeStore[resourceGroupName] {
result = append(result, v)
}
}
return result, nil
}

Expand Down
103 changes: 74 additions & 29 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
}

// GetScaleSetVms returns list of nodes for the given scale set.
// Note that the list results is not used directly because their resource ID format
// is not consistent with Get results.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) {
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) {
klog.V(4).Infof("GetScaleSetVms: starts")
ctx, cancel := getContextWithCancel()
defer cancel()
Expand All @@ -337,24 +335,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) {
return nil, err
}

allVMs := make([]string, 0)
for _, vm := range vmList {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("GetScaleSetVms.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

allVMs = append(allVMs, resourceID)
}

return allVMs, nil
return vmList, nil
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down Expand Up @@ -407,6 +388,9 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return err
}

scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()

instanceIDs := []string{}
for _, instance := range instances {
asg, err := scaleSet.manager.GetAsgForInstance(instance)
Expand All @@ -418,6 +402,11 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}

instanceID, err := getLastSegment(instance.Name)
if err != nil {
klog.Errorf("getLastSegment failed with error: %v", err)
Expand All @@ -427,9 +416,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
instanceIDs = append(instanceIDs, instanceID)
}

// nothing to delete
if len(instanceIDs) == 0 {
klog.V(3).Infof("No new instances eligible for deletion, skipping")
return nil
}

requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}

ctx, cancel := getContextWithCancel()
defer cancel()
resourceGroup := scaleSet.manager.config.ResourceGroup
Expand Down Expand Up @@ -667,16 +663,65 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
return nil, err
}

instances := make([]cloudprovider.Instance, len(vms))
for i := range vms {
name := "azure://" + vms[i]
instances[i] = cloudprovider.Instance{Id: name}
}

scaleSet.instanceCache = instances
scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.lastInstanceRefresh = time.Now()
klog.V(4).Infof("Nodes: returns")
return instances, nil
return scaleSet.instanceCache, nil
}

// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider.Instance {
instances := []cloudprovider.Instance{}

for _, vm := range vms {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

instances = append(instances, cloudprovider.Instance{
Id: "azure://" + resourceID,
Status: instanceStatusFromVM(vm),
})
}

return instances
}

func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance, true
}
}
return cloudprovider.Instance{}, false
}

// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus
func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
if vm.ProvisioningState == nil {
return nil
}

status := &cloudprovider.InstanceStatus{}
switch *vm.ProvisioningState {
case string(compute.ProvisioningStateDeleting):
status.State = cloudprovider.InstanceDeleting
case string(compute.ProvisioningStateCreating):
status.State = cloudprovider.InstanceCreating
default:
status.State = cloudprovider.InstanceRunning
}

return status
}

func (scaleSet *ScaleSet) invalidateInstanceCache() {
Expand Down
66 changes: 66 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,72 @@ func TestDeleteNodes(t *testing.T) {
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 1)
}

func TestDeleteNoConflictRequest(t *testing.T) {
vmssName := "test-asg"
var vmssCapacity int64 = 3

manager := newTestAzureManager(t)
vmsClient := &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
ProvisioningState: to.StringPtr("Deleting"),
},
},
},
},
}

scaleSetClient := &VirtualMachineScaleSetsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSet{
"test": {
"test-asg": {
Name: &vmssName,
Sku: &compute.Sku{
Capacity: &vmssCapacity,
},
},
},
},
}

response := autorest.Response{
Response: &http.Response{
Status: "OK",
},
}

scaleSetClient.On("DeleteInstances", mock.Anything, "test-asg", mock.Anything, mock.Anything).Return(response, nil)
manager.azClient.virtualMachineScaleSetsClient = scaleSetClient
manager.azClient.virtualMachineScaleSetVMsClient = vmsClient

resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000})
provider, err := BuildAzureCloudProvider(manager, resourceLimiter)
assert.NoError(t, err)

registered := manager.RegisterAsg(newTestScaleSet(manager, "test-asg"))
assert.True(t, registered)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID,
},
}

scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet)
assert.True(t, ok)

err = scaleSet.DeleteNodes([]*apiv1.Node{node})
// ensure that DeleteInstances isn't called
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 0)
}

func TestId(t *testing.T) {
provider := newTestProvider(t)
registered := provider.azureManager.RegisterAsg(
Expand Down