Skip to content

Commit

Permalink
Merge pull request #3141 from marwanad/avoid-deletion-conflicts
Browse files Browse the repository at this point in the history
Avoid sending extra deletion calls for in-progress deletions
  • Loading branch information
k8s-ci-robot authored Jun 2, 2020
2 parents 0845415 + fbe928c commit c92ca4c
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,7 +68,19 @@ func newTestAzureManager(t *testing.T) *AzureManager {
},
},
},
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{},
virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
},
},
},
},
},
},
}

Expand Down
20 changes: 9 additions & 11 deletions cluster-autoscaler/cloudprovider/azure/azure_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (client *VirtualMachineScaleSetsClientMock) List(ctx context.Context, resou
// VirtualMachineScaleSetVMsClientMock mocks for VirtualMachineScaleSetVMsClient.
type VirtualMachineScaleSetVMsClientMock struct {
mock.Mock
mutex sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSetVM
}

// Get gets a VirtualMachineScaleSetVM by VMScaleSetName and instanceID.
Expand All @@ -128,18 +130,14 @@ func (m *VirtualMachineScaleSetVMsClientMock) Get(ctx context.Context, resourceG

// List gets a list of VirtualMachineScaleSetVMs.
func (m *VirtualMachineScaleSetVMsClientMock) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) (result []compute.VirtualMachineScaleSetVM, rerr *retry.Error) {
ID := fakeVirtualMachineScaleSetVMID
instanceID := "0"
vmID := "123E4567-E89B-12D3-A456-426655440000"
properties := compute.VirtualMachineScaleSetVMProperties{
VMID: &vmID,
}
result = append(result, compute.VirtualMachineScaleSetVM{
ID: &ID,
InstanceID: &instanceID,
VirtualMachineScaleSetVMProperties: &properties,
})
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.FakeStore[resourceGroupName]; ok {
for _, v := range m.FakeStore[resourceGroupName] {
result = append(result, v)
}
}
return result, nil
}

Expand Down
103 changes: 74 additions & 29 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
}

// GetScaleSetVms returns list of nodes for the given scale set.
// Note that the list results is not used directly because their resource ID format
// is not consistent with Get results.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) {
klog.V(4).Infof("GetScaleSetVms: starts")
ctx, cancel := getContextWithCancel()
defer cancel()
Expand All @@ -336,24 +334,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, *retry.Error) {
return nil, rerr
}

allVMs := make([]string, 0)
for _, vm := range vmList {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("GetScaleSetVms.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

allVMs = append(allVMs, resourceID)
}

return allVMs, nil
return vmList, nil
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down Expand Up @@ -406,6 +387,9 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return err
}

scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()

instanceIDs := []string{}
for _, instance := range instances {
asg, err := scaleSet.manager.GetAsgForInstance(instance)
Expand All @@ -417,6 +401,11 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}

instanceID, err := getLastSegment(instance.Name)
if err != nil {
klog.Errorf("getLastSegment failed with error: %v", err)
Expand All @@ -426,9 +415,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
instanceIDs = append(instanceIDs, instanceID)
}

// nothing to delete
if len(instanceIDs) == 0 {
klog.V(3).Infof("No new instances eligible for deletion, skipping")
return nil
}

requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}

ctx, cancel := getContextWithCancel()
defer cancel()
resourceGroup := scaleSet.manager.config.ResourceGroup
Expand Down Expand Up @@ -682,16 +678,65 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
return nil, rerr.Error()
}

instances := make([]cloudprovider.Instance, len(vms))
for i := range vms {
name := "azure://" + vms[i]
instances[i] = cloudprovider.Instance{Id: name}
}

scaleSet.instanceCache = instances
scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.lastInstanceRefresh = time.Now()
klog.V(4).Infof("Nodes: returns")
return instances, nil
return scaleSet.instanceCache, nil
}

// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider.Instance {
instances := []cloudprovider.Instance{}

for _, vm := range vms {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

resourceID, err := convertResourceGroupNameToLower(*vm.ID)
if err != nil {
// This shouldn't happen. Log a waring message for tracking.
klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err)
continue
}

instances = append(instances, cloudprovider.Instance{
Id: "azure://" + resourceID,
Status: instanceStatusFromVM(vm),
})
}

return instances
}

func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
for _, instance := range scaleSet.instanceCache {
if instance.Id == providerID {
return instance, true
}
}
return cloudprovider.Instance{}, false
}

// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus
func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus {
if vm.ProvisioningState == nil {
return nil
}

status := &cloudprovider.InstanceStatus{}
switch *vm.ProvisioningState {
case string(compute.ProvisioningStateDeleting):
status.State = cloudprovider.InstanceDeleting
case string(compute.ProvisioningStateCreating):
status.State = cloudprovider.InstanceCreating
default:
status.State = cloudprovider.InstanceRunning
}

return status
}

func (scaleSet *ScaleSet) invalidateInstanceCache() {
Expand Down
66 changes: 66 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,72 @@ func TestDeleteNodes(t *testing.T) {
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 1)
}

func TestDeleteNoConflictRequest(t *testing.T) {
vmssName := "test-asg"
var vmssCapacity int64 = 3

manager := newTestAzureManager(t)
vmsClient := &VirtualMachineScaleSetVMsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSetVM{
"test": {
"0": {
ID: to.StringPtr(fakeVirtualMachineScaleSetVMID),
InstanceID: to.StringPtr("0"),
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"),
ProvisioningState: to.StringPtr("Deleting"),
},
},
},
},
}

scaleSetClient := &VirtualMachineScaleSetsClientMock{
FakeStore: map[string]map[string]compute.VirtualMachineScaleSet{
"test": {
"test-asg": {
Name: &vmssName,
Sku: &compute.Sku{
Capacity: &vmssCapacity,
},
},
},
},
}

response := autorest.Response{
Response: &http.Response{
Status: "OK",
},
}

scaleSetClient.On("DeleteInstances", mock.Anything, "test-asg", mock.Anything, mock.Anything).Return(response, nil)
manager.azClient.virtualMachineScaleSetsClient = scaleSetClient
manager.azClient.virtualMachineScaleSetVMsClient = vmsClient

resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000},
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000})
provider, err := BuildAzureCloudProvider(manager, resourceLimiter)
assert.NoError(t, err)

registered := manager.RegisterAsg(newTestScaleSet(manager, "test-asg"))
assert.True(t, registered)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID,
},
}

scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet)
assert.True(t, ok)

err = scaleSet.DeleteNodes([]*apiv1.Node{node})
// ensure that DeleteInstances isn't called
scaleSetClient.AssertNumberOfCalls(t, "DeleteInstances", 0)
}

func TestId(t *testing.T) {
provider := newTestProvider(t)
registered := provider.azureManager.RegisterAsg(
Expand Down

0 comments on commit c92ca4c

Please sign in to comment.