Skip to content

Commit

Permalink
Merge pull request #64 from DataDog/mayeul/cherry-pick/add-extended-r…
Browse files Browse the repository at this point in the history
…esource-support-in-gce

Cherry-pick: add extended resource support in GCE
  • Loading branch information
zaymat authored Oct 25, 2022
2 parents 8448683 + 433cce6 commit b447a03
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 4 deletions.
42 changes: 40 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (t *GceTemplateBuilder) getAcceleratorCount(accelerators []*gce.Accelerator

// BuildCapacity builds a list of resource capacities given list of hardware.
func (t *GceTemplateBuilder) BuildCapacity(cpu int64, mem int64, accelerators []*gce.AcceleratorConfig, os OperatingSystem, osDistribution OperatingSystemDistribution, arch SystemArchitecture,
ephemeralStorage int64, ephemeralStorageLocalSSDCount int64, pods *int64, version string, r OsReservedCalculator) (apiv1.ResourceList, error) {
ephemeralStorage int64, ephemeralStorageLocalSSDCount int64, pods *int64, version string, r OsReservedCalculator, extendedResources apiv1.ResourceList) (apiv1.ResourceList, error) {
capacity := apiv1.ResourceList{}
if pods == nil {
capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI)
Expand All @@ -86,6 +86,10 @@ func (t *GceTemplateBuilder) BuildCapacity(cpu int64, mem int64, accelerators []
capacity[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(math.Max(float64(storageTotal), 0)), resource.DecimalSI)
}

for resourceName, quantity := range extendedResources {
capacity[resourceName] = quantity
}

return capacity, nil
}

Expand Down Expand Up @@ -191,10 +195,17 @@ func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, template *gce.Instan
return nil, fmt.Errorf("could not fetch ephemeral storage from instance template: %v", err)
}

capacity, err := t.BuildCapacity(cpu, mem, template.Properties.GuestAccelerators, os, osDistribution, arch, ephemeralStorage, ssdCount, pods, mig.Version(), reserved)
extendedResources, err := extractExtendedResourcesFromKubeEnv(kubeEnvValue)
if err != nil {
// External Resources are optional and should not break the template creation
klog.Errorf("could not fetch extended resources from instance template: %v", err)
}

capacity, err := t.BuildCapacity(cpu, mem, template.Properties.GuestAccelerators, os, osDistribution, arch, ephemeralStorage, ssdCount, pods, mig.Version(), reserved, extendedResources)
if err != nil {
return nil, err
}

node.Status = apiv1.NodeStatus{
Capacity: capacity,
}
Expand Down Expand Up @@ -437,6 +448,33 @@ func extractKubeReservedFromKubeEnv(kubeEnv string) (string, error) {
return kubeReserved, nil
}

func extractExtendedResourcesFromKubeEnv(kubeEnvValue string) (apiv1.ResourceList, error) {
extendedResourcesAsString, found, err := extractAutoscalerVarFromKubeEnv(kubeEnvValue, "extended_resources")
if err != nil {
klog.Warning("error while obtaining extended_resources from AUTOSCALER_ENV_VARS; %v", err)
return nil, err
}

if !found {
return apiv1.ResourceList{}, nil
}

extendedResourcesMap, err := parseKeyValueListToMap(extendedResourcesAsString)
if err != nil {
return apiv1.ResourceList{}, err
}

extendedResources := apiv1.ResourceList{}
for name, quantity := range extendedResourcesMap {
if q, err := resource.ParseQuantity(quantity); err == nil && q.Sign() >= 0 {
extendedResources[apiv1.ResourceName(name)] = q
} else if err != nil {
klog.Warning("ignoring invalid value in extended_resources defined in AUTOSCALER_ENV_VARS; %v", err)
}
}
return extendedResources, nil
}

// OperatingSystem denotes operating system used by nodes coming from node group
type OperatingSystem string

Expand Down
142 changes: 140 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
reservedEphemeralStorage string
isEphemeralStorageBlocked bool
ephemeralStorageLocalSSDCount int64
extendedResources apiv1.ResourceList
// test outputs
expectedErr bool
}
Expand Down Expand Up @@ -183,6 +184,37 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
attachedLocalSSDCount: 4,
expectedErr: false,
},
{
scenario: "extended_resources present in kube-env",
kubeEnv: "AUTOSCALER_ENV_VARS: kube_reserved=cpu=0,memory=0,ephemeral-storage=10Gi;os_distribution=cos;os=linux;ephemeral_storage_local_ssd_count=2;extended_resources=someResource=2,anotherResource=1G\n",
physicalCpu: 8,
physicalMemory: 200 * units.MiB,
ephemeralStorageLocalSSDCount: 2,
kubeReserved: true,
reservedCpu: "0m",
reservedMemory: fmt.Sprintf("%v", 0*units.MiB),
reservedEphemeralStorage: "10Gi",
attachedLocalSSDCount: 4,
expectedErr: false,
extendedResources: apiv1.ResourceList{
apiv1.ResourceName("someResource"): *resource.NewQuantity(2, resource.DecimalSI),
apiv1.ResourceName("anotherResource"): *resource.NewQuantity(1*units.GB, resource.DecimalSI),
},
},
{
scenario: "malformed extended_resources in kube-env",
kubeEnv: "AUTOSCALER_ENV_VARS: kube_reserved=cpu=0,memory=0,ephemeral-storage=10Gi;os_distribution=cos;os=linux;ephemeral_storage_local_ssd_count=2;extended_resources=someResource\n",
physicalCpu: 8,
physicalMemory: 200 * units.MiB,
ephemeralStorageLocalSSDCount: 2,
kubeReserved: true,
reservedCpu: "0m",
reservedMemory: fmt.Sprintf("%v", 0*units.MiB),
reservedEphemeralStorage: "10Gi",
attachedLocalSSDCount: 4,
expectedErr: false,
extendedResources: apiv1.ResourceList{},
},
}
for _, tc := range testCases {
t.Run(tc.scenario, func(t *testing.T) {
Expand Down Expand Up @@ -238,7 +270,7 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
} else if tc.isEphemeralStorageBlocked {
physicalEphemeralStorageGiB = 0
}
capacity, err := tb.BuildCapacity(tc.physicalCpu, tc.physicalMemory, tc.accelerators, OperatingSystemLinux, OperatingSystemDistributionCOS, "", physicalEphemeralStorageGiB*units.GiB, tc.ephemeralStorageLocalSSDCount, tc.pods, "", &GceReserved{})
capacity, err := tb.BuildCapacity(tc.physicalCpu, tc.physicalMemory, tc.accelerators, OperatingSystemLinux, OperatingSystemDistributionCOS, "", physicalEphemeralStorageGiB*units.GiB, tc.ephemeralStorageLocalSSDCount, tc.pods, "", &GceReserved{}, tc.extendedResources)
assert.NoError(t, err)
assertEqualResourceLists(t, "Capacity", capacity, node.Status.Capacity)
if !tc.kubeReserved {
Expand Down Expand Up @@ -532,7 +564,7 @@ func TestBuildCapacityMemory(t *testing.T) {
t.Run(fmt.Sprintf("%v", idx), func(t *testing.T) {
tb := GceTemplateBuilder{}
noAccelerators := make([]*gce.AcceleratorConfig, 0)
buildCapacity, err := tb.BuildCapacity(tc.physicalCpu, tc.physicalMemory, noAccelerators, tc.os, OperatingSystemDistributionCOS, "", -1, 0, nil, "", &GceReserved{})
buildCapacity, err := tb.BuildCapacity(tc.physicalCpu, tc.physicalMemory, noAccelerators, tc.os, OperatingSystemDistributionCOS, "", -1, 0, nil, "", &GceReserved{}, apiv1.ResourceList{})
assert.NoError(t, err)
expectedCapacity, err := makeResourceList2(tc.physicalCpu, tc.expectedCapacityMemory, 0, 110)
assert.NoError(t, err)
Expand Down Expand Up @@ -1099,6 +1131,112 @@ func TestExtractOperatingSystemDistributionFromKubeEnv(t *testing.T) {
}
}

func TestExtractExtendedResourcesFromKubeEnv(t *testing.T) {
type testCase struct {
name string
kubeEnv string
expectedExtendedResources apiv1.ResourceList
expectedErr bool
}

testCases := []testCase{
{
name: "numeric value",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=10",
expectedExtendedResources: apiv1.ResourceList{
apiv1.ResourceName("foo"): *resource.NewQuantity(10, resource.DecimalSI),
},
expectedErr: false,
},
{
name: "numeric value with quantity suffix",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=10G",
expectedExtendedResources: apiv1.ResourceList{
apiv1.ResourceName("foo"): *resource.NewQuantity(10*units.GB, resource.DecimalSI),
},
expectedErr: false,
},
{
name: "multiple extended_resources definition",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=10G,bar=230",
expectedExtendedResources: apiv1.ResourceList{
apiv1.ResourceName("foo"): *resource.NewQuantity(10*units.GB, resource.DecimalSI),
apiv1.ResourceName("bar"): *resource.NewQuantity(230, resource.DecimalSI),
},
expectedErr: false,
},
{
name: "invalid value",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=bar",
expectedExtendedResources: apiv1.ResourceList{},
expectedErr: false,
},
{
name: "both valid and invalid values",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=bar,baz=10G",
expectedExtendedResources: apiv1.ResourceList{
apiv1.ResourceName("baz"): *resource.NewQuantity(10*units.GB, resource.DecimalSI),
},
expectedErr: false,
},
{
name: "invalid quantity suffix",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo=10Wi",
expectedExtendedResources: apiv1.ResourceList{},
expectedErr: false,
},
{
name: "malformed extended_resources map",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources=foo",
expectedExtendedResources: apiv1.ResourceList{},
expectedErr: true,
},
{
name: "malformed extended_resources definition",
kubeEnv: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d,cloud.google.com/gke-nodepool=pool-3,cloud.google.com/gke-preemptible=true;" +
"node_taints='dedicated=ml:NoSchedule,test=dev:PreferNoSchedule,a=b:c';" +
"kube_reserved=cpu=1000m,memory=300000Mi;" +
"extended_resources/",
expectedExtendedResources: apiv1.ResourceList{},
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
extendedResources, err := extractExtendedResourcesFromKubeEnv(tc.kubeEnv)
assertEqualResourceLists(t, "Resources", tc.expectedExtendedResources, extendedResources)
if tc.expectedErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}

}

func TestParseKubeReserved(t *testing.T) {
type testCase struct {
reserved string
Expand Down

0 comments on commit b447a03

Please sign in to comment.