diff --git a/charts/cluster-autoscaler/Chart.yaml b/charts/cluster-autoscaler/Chart.yaml index 1f81b953e34f..5b330ea439c8 100644 --- a/charts/cluster-autoscaler/Chart.yaml +++ b/charts/cluster-autoscaler/Chart.yaml @@ -17,4 +17,4 @@ name: cluster-autoscaler sources: - https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler type: application -version: 9.14.0 +version: 9.15.0 diff --git a/charts/cluster-autoscaler/README.md b/charts/cluster-autoscaler/README.md index e358316171d6..a55d3efd0f87 100644 --- a/charts/cluster-autoscaler/README.md +++ b/charts/cluster-autoscaler/README.md @@ -209,6 +209,18 @@ Install the chart with ``` $ helm install my-release autoscaler/cluster-autoscaler -f myvalues.yaml ``` +### Cluster-API + +`cloudProvider: clusterapi` must be set, and then one or more of +- `autoDiscovery.clusterName` +- or `autoDiscovery.labels` +See [here](https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#configuring-node-group-auto-discovery) for more details + +Additional config parameters avaible, see the `values.yaml` for more details +`clusterAPIMode` +`clusterAPIKubeconfigSecret` +`clusterAPIWorkloadKubeconfigPath` +`clusterAPICloudConfigPath` ## Uninstalling the Chart @@ -338,7 +350,8 @@ Though enough for the majority of installations, the default PodSecurityPolicy _ |-----|------|---------|-------------| | additionalLabels | object | `{}` | Labels to add to each object of the chart. | | affinity | object | `{}` | Affinity for pod assignment | -| autoDiscovery.clusterName | string | `nil` | Enable autodiscovery for `cloudProvider=aws`, for groups matching `autoDiscovery.tags`. Enable autodiscovery for `cloudProvider=gce`, but no MIG tagging required. Enable autodiscovery for `cloudProvider=magnum`, for groups matching `autoDiscovery.roles`. | +| autoDiscovery.clusterName | string | `nil` | Enable autodiscovery for `cloudProvider=aws`, for groups matching `autoDiscovery.tags`. Enable autodiscovery for `cloudProvider=clusterapi`, for groups matching `autoDiscovery.labels`. Enable autodiscovery for `cloudProvider=gce`, but no MIG tagging required. Enable autodiscovery for `cloudProvider=magnum`, for groups matching `autoDiscovery.roles`. | +| autoDiscovery.labels | list | `[]` | Cluster-API labels to match https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#configuring-node-group-auto-discovery | | autoDiscovery.roles | list | `["worker"]` | Magnum node group roles to match. | | autoDiscovery.tags | list | `["k8s.io/cluster-autoscaler/enabled","k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }}"]` | ASG tags to match, run through `tpl`. | | autoscalingGroups | list | `[]` | For AWS, Azure AKS or Magnum. At least one element is required if not using `autoDiscovery`. For example:
- name: asg1| @@ -356,7 +369,11 @@ Though enough for the majority of installations, the default PodSecurityPolicy _ | azureUseManagedIdentityExtension | bool | `false` | Whether to use Azure's managed identity extension for credentials. If using MSI, ensure subscription ID, resource group, and azure AKS cluster name are set. | | azureVMType | string | `"AKS"` | Azure VM type. | | cloudConfigPath | string | `"/etc/gce.conf"` | Configuration file for cloud provider. | -| cloudProvider | string | `"aws"` | The cloud provider where the autoscaler runs. Currently only `gce`, `aws`, `azure` and `magnum` are supported. `aws` supported for AWS. `gce` for GCE. `azure` for Azure AKS. `magnum` for OpenStack Magnum. | +| cloudProvider | string | `"aws"` | The cloud provider where the autoscaler runs. Currently only `gce`, `aws`, `azure`, `magnum` and `clusterapi` are supported. `aws` supported for AWS. `gce` for GCE. `azure` for Azure AKS. `magnum` for OpenStack Magnum, `clusterapi` for Cluster API. | +| clusterAPICloudConfigPath | string | `"/etc/kubernetes/mgmt-kubeconfig"` | Path to kubeconfig for connecting to Cluster API Management Cluster, only used if `clusterAPIMode=kubeconfig-kubeconfig or incluster-kubeconfig` | +| clusterAPIKubeconfigSecret | string | `""` | Secret containing kubeconfig for connecting to Cluster API managed workloadcluster Required if `cloudProvider=clusterapi` and `clusterAPIMode=kubeconfig-kubeconfig,kubeconfig-incluster or incluster-kubeconfig` | +| clusterAPIMode | string | `"incluster-incluster"` | Cluster API mode, see https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#connecting-cluster-autoscaler-to-cluster-api-management-and-workload-clusters Syntax: workloadClusterMode-ManagementClusterMode for `kubeconfig-kubeconfig`, `incluster-kubeconfig` and `single-kubeconfig` you always must mount the external kubeconfig using either `extraVolumeSecrets` or `extraMounts` and `extraVolumes` if you dont set `clusterAPIKubeconfigSecret`and thus use an in-cluster config or want to use a non capi generated kubeconfig you must do so for the workload kubeconfig as well | +| clusterAPIWorkloadKubeconfigPath | string | `"/etc/kubernetes/value"` | Path to kubeconfig for connecting to Cluster API managed workloadcluster, only used if `clusterAPIMode=kubeconfig-kubeconfig or kubeconfig-incluster` | | containerSecurityContext | object | `{}` | [Security context for container](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/) | | dnsPolicy | string | `"ClusterFirst"` | Defaults to `ClusterFirst`. Valid values are: `ClusterFirstWithHostNet`, `ClusterFirst`, `Default` or `None`. If autoscaler does not depend on cluster DNS, recommended to set this to `Default`. | | envFromConfigMap | string | `""` | ConfigMap name to use as envFrom. | @@ -389,6 +406,7 @@ Though enough for the majority of installations, the default PodSecurityPolicy _ | prometheusRule.interval | string | `nil` | How often rules in the group are evaluated (falls back to `global.evaluation_interval` if not set). | | prometheusRule.namespace | string | `"monitoring"` | Namespace which Prometheus is running in. | | prometheusRule.rules | list | `[]` | Rules spec template (see https://github.com/prometheus-operator/prometheus-operator/blob/master/Documentation/api.md#rule). | +| rbac.clusterScoped | bool | `true` | if set to false will only provision RBAC to alter resources in the current namespace. Most useful for Cluster-API | | rbac.create | bool | `true` | If `true`, create and use RBAC resources. | | rbac.pspEnabled | bool | `false` | If `true`, creates and uses RBAC resources required in the cluster with [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/) enabled. Must be used with `rbac.create` set to `true`. | | rbac.serviceAccount.annotations | object | `{}` | Additional Service Account annotations. | diff --git a/charts/cluster-autoscaler/README.md.gotmpl b/charts/cluster-autoscaler/README.md.gotmpl index dda305c6b43a..5e50ec4ba2ec 100644 --- a/charts/cluster-autoscaler/README.md.gotmpl +++ b/charts/cluster-autoscaler/README.md.gotmpl @@ -209,6 +209,19 @@ Install the chart with ``` $ helm install my-release autoscaler/cluster-autoscaler -f myvalues.yaml ``` +### Cluster-API + +`cloudProvider: clusterapi` must be set, and then one or more of +- `autoDiscovery.clusterName` +- or `autoDiscovery.labels` +See [here](https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#configuring-node-group-auto-discovery) for more details + + +Additional config parameters avaible, see the `values.yaml` for more details +`clusterAPIMode` +`clusterAPIKubeconfigSecret` +`clusterAPIWorkloadKubeconfigPath` +`clusterAPICloudConfigPath` ## Uninstalling the Chart diff --git a/charts/cluster-autoscaler/templates/_helpers.tpl b/charts/cluster-autoscaler/templates/_helpers.tpl index 95086b141500..726086e8992d 100644 --- a/charts/cluster-autoscaler/templates/_helpers.tpl +++ b/charts/cluster-autoscaler/templates/_helpers.tpl @@ -94,4 +94,24 @@ Return true if the priority expander is enabled {{- if has "priority" $expanders -}} {{- true -}} {{- end -}} -{{- end -}} \ No newline at end of file +{{- end -}} + +{{/* +Return the autodiscoveryparameters for clusterapi. +*/}} +{{- define "cluster-autoscaler.capiAutodiscoveryConfig" -}} +{{- if .Values.autoDiscovery.clusterName -}} +{{- print "clusterName=" -}}{{ .Values.autoDiscovery.clusterName }} +{{- end -}} +{{- if and .Values.autoDiscovery.clusterName .Values.autoDiscovery.labels -}} +{{- print "," -}} +{{- end -}} +{{- if .Values.autoDiscovery.labels -}} +{{- range $i, $el := .Values.autoDiscovery.labels -}} +{{- if $i -}}{{- print "," -}}{{- end -}} +{{- range $key, $val := $el -}} +{{- $key -}}{{- print "=" -}}{{- $val -}} +{{- end -}} +{{- end -}} +{{- end -}} +{{- end -}} diff --git a/charts/cluster-autoscaler/templates/clusterrole.yaml b/charts/cluster-autoscaler/templates/clusterrole.yaml index 409fbe2a80c3..d9153eeca417 100644 --- a/charts/cluster-autoscaler/templates/clusterrole.yaml +++ b/charts/cluster-autoscaler/templates/clusterrole.yaml @@ -1,4 +1,4 @@ -{{- if .Values.rbac.create -}} +{{- if and .Values.rbac.create .Values.rbac.clusterScoped -}} apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: @@ -146,5 +146,18 @@ rules: verbs: - use {{- end -}} - +{{- if and ( and ( eq .Values.cloudProvider "clusterapi" ) ( .Values.rbac.clusterScoped ) ( or ( eq .Values.clusterAPIMode "incluster-incluster" ) ( eq .Values.clusterAPIMode "incluster-kubeconfig" ) ))}} + - apiGroups: + - cluster.x-k8s.io + resources: + - machinedeployments + - machinedeployments/scale + - machines + - machinesets + verbs: + - get + - list + - update + - watch +{{- end }} {{- end -}} diff --git a/charts/cluster-autoscaler/templates/clusterrolebinding.yaml b/charts/cluster-autoscaler/templates/clusterrolebinding.yaml index d1e8308ad4df..d2384dc629f6 100644 --- a/charts/cluster-autoscaler/templates/clusterrolebinding.yaml +++ b/charts/cluster-autoscaler/templates/clusterrolebinding.yaml @@ -1,4 +1,4 @@ -{{- if .Values.rbac.create -}} +{{- if and .Values.rbac.create .Values.rbac.clusterScoped -}} apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: diff --git a/charts/cluster-autoscaler/templates/deployment.yaml b/charts/cluster-autoscaler/templates/deployment.yaml index d68107273303..b6980da89bb1 100644 --- a/charts/cluster-autoscaler/templates/deployment.yaml +++ b/charts/cluster-autoscaler/templates/deployment.yaml @@ -1,4 +1,4 @@ -{{- if or .Values.autoDiscovery.clusterName .Values.autoscalingGroups }} +{{- if or ( or .Values.autoDiscovery.clusterName .Values.autoDiscovery.labels ) .Values.autoscalingGroups }} {{/* one of the above is required */}} apiVersion: {{ template "deployment.apiVersion" . }} kind: Deployment @@ -69,6 +69,21 @@ spec: {{- else }} - --cluster-name={{ .Values.magnumClusterName }} {{- end }} + {{- else if eq .Values.cloudProvider "clusterapi" }} + {{- if or .Values.autoDiscovery.clusterName .Values.autoDiscovery.labels }} + - --node-group-auto-discovery=clusterapi:{{ template "cluster-autoscaler.capiAutodiscoveryConfig" . }} + {{- end }} + {{- if eq .Values.clusterAPIMode "incluster-kubeconfig"}} + - --cloud-config={{ .Values.clusterAPICloudConfigPath }} + {{- else if eq .Values.clusterAPIMode "kubeconfig-incluster"}} + - --kubeconfig={{ .Values.clusterAPIWorkloadKubeconfigPath }} + - --clusterapi-cloud-config-authoritative + {{- else if eq .Values.clusterAPIMode "kubeconfig-kubeconfig"}} + - --kubeconfig={{ .Values.clusterAPIWorkloadKubeconfigPath }} + - --cloud-config={{ .Values.clusterAPICloudConfigPath }} + {{- else if eq .Values.clusterAPIMode "single-kubeconfig"}} + - --kubeconfig={{ .Values.clusterAPIWorkloadKubeconfigPath }} + {{- end }} {{- end }} {{- if eq .Values.cloudProvider "magnum" }} - --cloud-config={{ .Values.cloudConfigPath }} @@ -203,6 +218,10 @@ spec: mountPath: {{ required "Must specify mountPath!" $value.mountPath }} readOnly: true {{- end }} + {{- if .Values.clusterAPIKubeconfigSecret }} + - name: cluster-api-kubeconfig + mountPath: {{ .Values.clusterAPIWorkloadKubeconfigPath | trimSuffix "/value" }} + {{- end }} {{- if .Values.extraVolumeMounts }} {{ toYaml .Values.extraVolumeMounts | nindent 12 }} {{- end }} @@ -250,6 +269,11 @@ spec: {{- if .Values.extraVolumes }} {{- toYaml .Values.extraVolumes | nindent 10 }} {{- end }} + {{- if .Values.clusterAPIKubeconfigSecret }} + - name: cluster-api-kubeconfig + secret: + secretName: {{ .Values.clusterAPIKubeconfigSecret }} + {{- end }} {{- end }} {{- if .Values.image.pullSecrets }} imagePullSecrets: diff --git a/charts/cluster-autoscaler/templates/role.yaml b/charts/cluster-autoscaler/templates/role.yaml index 6aa1a1ec5a2c..79dc1b6a08c5 100644 --- a/charts/cluster-autoscaler/templates/role.yaml +++ b/charts/cluster-autoscaler/templates/role.yaml @@ -43,4 +43,35 @@ rules: - get - update {{- end }} +{{- if and ( and ( eq .Values.cloudProvider "clusterapi" ) ( not .Values.rbac.clusterScoped ) ( or ( eq .Values.clusterAPIMode "incluster-incluster" ) ( eq .Values.clusterAPIMode "incluster-kubeconfig" ) ))}} + - apiGroups: + - cluster.x-k8s.io + resources: + - machinedeployments + - machinedeployments/scale + - machines + - machinesets + verbs: + - get + - list + - update + - watch +{{- end }} +{{- if ( not .Values.rbac.clusterScoped ) }} + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - apiGroups: + - coordination.k8s.io + resourceNames: + - cluster-autoscaler + resources: + - leases + verbs: + - get + - update +{{- end }} {{- end -}} diff --git a/charts/cluster-autoscaler/values.yaml b/charts/cluster-autoscaler/values.yaml index 160fb7755d4e..bc82e01220be 100644 --- a/charts/cluster-autoscaler/values.yaml +++ b/charts/cluster-autoscaler/values.yaml @@ -3,24 +3,29 @@ affinity: {} autoDiscovery: - # cloudProviders `aws`, `gce` and `magnum` are supported by auto-discovery at this time + # cloudProviders `aws`, `gce`, `magnum` and `clusterapi` are supported by auto-discovery at this time # AWS: Set tags as described in https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/aws/README.md#auto-discovery-setup # autoDiscovery.clusterName -- Enable autodiscovery for `cloudProvider=aws`, for groups matching `autoDiscovery.tags`. + # Enable autodiscovery for `cloudProvider=clusterapi`, for groups matching `autoDiscovery.labels`. # Enable autodiscovery for `cloudProvider=gce`, but no MIG tagging required. # Enable autodiscovery for `cloudProvider=magnum`, for groups matching `autoDiscovery.roles`. clusterName: # cluster.local # autoDiscovery.tags -- ASG tags to match, run through `tpl`. tags: - - k8s.io/cluster-autoscaler/enabled - - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }} + - k8s.io/cluster-autoscaler/enabled + - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }} # - kubernetes.io/cluster/{{ .Values.autoDiscovery.clusterName }} # autoDiscovery.roles -- Magnum node group roles to match. roles: - - worker + - worker + # autoDiscovery.labels -- Cluster-API labels to match https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#configuring-node-group-auto-discovery + labels: [] + # - color: green + # - shape: circle # autoscalingGroups -- For AWS, Azure AKS or Magnum. At least one element is required if not using `autoDiscovery`. For example: #
maxSize: 2
minSize: 1
# - name: asg1
@@ -99,13 +104,29 @@ magnumClusterName: "" # magnumCABundlePath -- Path to the host's CA bundle, from `ca-file` in the cloud-config file. magnumCABundlePath: "/etc/kubernetes/ca-bundle.crt" +# clusterAPIMode -- Cluster API mode, see https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/clusterapi/README.md#connecting-cluster-autoscaler-to-cluster-api-management-and-workload-clusters +# Syntax: workloadClusterMode-ManagementClusterMode +# for `kubeconfig-kubeconfig`, `incluster-kubeconfig` and `single-kubeconfig` you always must mount the external kubeconfig using either `extraVolumeSecrets` or `extraMounts` and `extraVolumes` +# if you dont set `clusterAPIKubeconfigSecret`and thus use an in-cluster config or want to use a non capi generated kubeconfig you must do so for the workload kubeconfig as well +clusterAPIMode: incluster-incluster # incluster-incluster, incluster-kubeconfig, kubeconfig-incluster, kubeconfig-kubeconfig, single-kubeconfig + +# clusterAPIKubeconfigSecret -- Secret containing kubeconfig for connecting to Cluster API managed workloadcluster +# Required if `cloudProvider=clusterapi` and `clusterAPIMode=kubeconfig-kubeconfig,kubeconfig-incluster or incluster-kubeconfig` +clusterAPIKubeconfigSecret: "" + +# clusterAPIWorkloadKubeconfigPath -- Path to kubeconfig for connecting to Cluster API managed workloadcluster, only used if `clusterAPIMode=kubeconfig-kubeconfig or kubeconfig-incluster` +clusterAPIWorkloadKubeconfigPath: /etc/kubernetes/value + +# clusterAPICloudConfigPath -- Path to kubeconfig for connecting to Cluster API Management Cluster, only used if `clusterAPIMode=kubeconfig-kubeconfig or incluster-kubeconfig` +clusterAPICloudConfigPath: /etc/kubernetes/mgmt-kubeconfig + # cloudConfigPath -- Configuration file for cloud provider. cloudConfigPath: /etc/gce.conf # cloudProvider -- The cloud provider where the autoscaler runs. -# Currently only `gce`, `aws`, `azure` and `magnum` are supported. +# Currently only `gce`, `aws`, `azure`, `magnum` and `clusterapi` are supported. # `aws` supported for AWS. `gce` for GCE. `azure` for Azure AKS. -# `magnum` for OpenStack Magnum. +# `magnum` for OpenStack Magnum, `clusterapi` for Cluster API. cloudProvider: aws # containerSecurityContext -- [Security context for container](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/) @@ -247,6 +268,8 @@ rbac: # rbac.pspEnabled -- If `true`, creates and uses RBAC resources required in the cluster with [Pod Security Policies](https://kubernetes.io/docs/concepts/policy/pod-security-policy/) enabled. # Must be used with `rbac.create` set to `true`. pspEnabled: false + # rbac.clusterScoped -- if set to false will only provision RBAC to alter resources in the current namespace. Most useful for Cluster-API + clusterScoped: true serviceAccount: # rbac.serviceAccount.annotations -- Additional Service Account annotations. annotations: {} diff --git a/cluster-autoscaler/FAQ.md b/cluster-autoscaler/FAQ.md index 50bb9def04f6..8317293556e0 100644 --- a/cluster-autoscaler/FAQ.md +++ b/cluster-autoscaler/FAQ.md @@ -683,8 +683,7 @@ would match the cluster size. This expander is described in more details * `priority` - selects the node group that has the highest priority assigned by the user. It's configuration is described in more details [here](expander/priority/readme.md) - -Multiple expanders may be passed, i.e. +From 1.23.0 onwards, multiple expanders may be passed, i.e. `.cluster-autoscaler --expander=priority,least-waste` This will cause the `least-waste` expander to be used as a fallback in the event that the priority expander selects multiple node groups. In general, a list of expanders can be used, where the output of one is passed to the next and the final decision by randomly selecting one. An expander must not appear in the list more than once. diff --git a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md index 217bd3d505eb..cf251787c234 100644 --- a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md +++ b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md @@ -53,9 +53,11 @@ D) Set up [Cluster Autoscaler Auto-Discovery] using the [tutorial] . Note: The keys for the tags that you entered don't have values. Cluster Autoscaler ignores any value set for the keys. -- Create an IAM Policy for cluster autoscaler and to enable AutoDiscovery. +__NOTE:__ Please see [the README](README.md#IAM-Policy) for more information on best practices with this IAM role. -```sh +- Create an IAM Policy for cluster autoscaler and to enable AutoDiscovery as well as discovery of instance types. + +```json { "Version": "2012-10-17", "Statement": [ @@ -66,17 +68,23 @@ Note: The keys for the tags that you entered don't have values. Cluster Autoscal "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", "autoscaling:DescribeTags", + "ec2:DescribeInstanceTypes", + "ec2:DescribeLaunchTemplateVersions" + ], + "Resource": ["*"] + }, + { + "Effect": "Allow", + "Action": [ "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup" ], - "Resource": "*" + "Resource": ["*"] } ] } ``` -NOTE: ``` autoscaling:DescribeTags ``` is very important if you are making use of the AutoDiscovery feature of the Cluster AutoScaler. - - Attach the above created policy to the *instance role* that's attached to your Amazon EKS worker nodes. - Download a deployment example file provided by the Cluster Autoscaler project on GitHub, run the following command: diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index d90ce7f483b2..b03377e0b412 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -19,7 +19,23 @@ the EC2 instance on which the Cluster Autoscaler pod runs. ### IAM Policy -The following policy provides the minimum privileges necessary for Cluster Autoscaler to run: +There are a number of ways to run the autoscaler in AWS, which can significantly +impact the range of IAM permissions required for the Cluster Autoscaler to function +properly. Two options are provided below, one which will allow use of all of the +features of the Cluster Autoscaler, the second with a more limited range of IAM +actions enabled, which enforces using certain configuration options in the +Cluster Autoscaler binary. + +It is strongly recommended to restrict the target resources for the autoscaling actions +by either [specifying Auto Scaling Group ARNs](https://docs.aws.amazon.com/autoscaling/latest/userguide/control-access-using-iam.html#policy-auto-scaling-resources) in the `Resource` list of the policy or +[using tag based conditionals](https://docs.aws.amazon.com/autoscaling/ec2/userguide/control-access-using-iam.html#security_iam_service-with-iam-tags). The [minimal policy](#minimal-iam-permissions-policy) +includes an example of restricting by ASG ARN. + +#### Full Cluster Autoscaler Features Policy (Recommended) + +Permissions required when using [ASG Autodiscovery](#Auto-discovery-setup) and +Dynamic EC2 List Generation (the default behaviour). In this example, only the second block of actions +should be updated to restrict the resources/add conditionals: ```json { @@ -31,6 +47,15 @@ The following policy provides the minimum privileges necessary for Cluster Autos "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeTags", + "ec2:DescribeInstanceTypes", + "ec2:DescribeLaunchTemplateVersions" + ], + "Resource": ["*"] + }, + { + "Effect": "Allow", + "Action": [ "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup", "ec2:DescribeImages", @@ -43,17 +68,42 @@ The following policy provides the minimum privileges necessary for Cluster Autos } ``` -If you'd like Cluster Autoscaler to [automatically -discover](#auto-discovery-setup) EC2 Auto Scaling Groups **(recommended)**, add -`autoscaling:DescribeTags` to the `Action` list. Also add -`autoscaling:DescribeLaunchConfigurations` (if you created your ASG using a -Launch Configuration) and/or `ec2:DescribeLaunchTemplateVersions` (if you -created your ASG using a Launch Template) to the `Action` list. - -If you prefer, you can restrict the target resources for the autoscaling actions -by specifying Auto Scaling Group ARNs in the `Resource` list of the policy. More -information can be found -[here](https://docs.aws.amazon.com/autoscaling/latest/userguide/control-access-using-iam.html#policy-auto-scaling-resources). +#### Minimal IAM Permissions Policy + +*NOTE:* The below policies/arguments to the Cluster Autoscaler need to be modified as appropriate +for the names of your ASGs, as well as account ID and AWS region before being used. + +The following policy provides the minimum privileges necessary for Cluster Autoscaler to run. +When using this policy, you cannot use autodiscovery of ASGs. In addition, it restricts the +IAM permissions to the node groups the Cluster Autoscaler is configured to scale. + +This in turn means that you must pass the following arguments to the Cluster Autoscaler +binary, replacing min and max node counts and the ASG: + +```bash +--aws-use-static-instance-list=false +--nodes=1:100:exampleASG1 +--nodes=1:100:exampleASG2 +``` + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups", + "autoscaling:DescribeAutoScalingInstances", + "autoscaling:DescribeLaunchConfigurations", + "autoscaling:SetDesiredCapacity", + "autoscaling:TerminateInstanceInAutoScalingGroup" + ], + "Resource": ["arn:aws:autoscaling:${YOUR_CLUSTER_AWS_REGION}:${YOUR_AWS_ACCOUNT_ID}:autoScalingGroup:*:autoScalingGroupName/${YOUR_ASG_NAME}"] + } + ] +} +``` ### Using OIDC Federated Authentication diff --git a/cluster-autoscaler/cloudprovider/aws/ec2_instance_types.go b/cluster-autoscaler/cloudprovider/aws/ec2_instance_types.go index fd806f36e410..aae69c9c9e09 100644 --- a/cluster-autoscaler/cloudprovider/aws/ec2_instance_types.go +++ b/cluster-autoscaler/cloudprovider/aws/ec2_instance_types.go @@ -28,7 +28,7 @@ type InstanceType struct { } // StaticListLastUpdateTime is a string declaring the last time the static list was updated. -var StaticListLastUpdateTime = "2021-12-13" +var StaticListLastUpdateTime = "2022-02-16" // InstanceTypes is a map of ec2 resources var InstanceTypes = map[string]*InstanceType{ @@ -445,6 +445,76 @@ var InstanceTypes = map[string]*InstanceType{ GPU: 0, Architecture: "amd64", }, + "c6a.12xlarge": { + InstanceType: "c6a.12xlarge", + VCPU: 48, + MemoryMb: 98304, + GPU: 0, + Architecture: "amd64", + }, + "c6a.16xlarge": { + InstanceType: "c6a.16xlarge", + VCPU: 64, + MemoryMb: 131072, + GPU: 0, + Architecture: "amd64", + }, + "c6a.24xlarge": { + InstanceType: "c6a.24xlarge", + VCPU: 96, + MemoryMb: 196608, + GPU: 0, + Architecture: "amd64", + }, + "c6a.2xlarge": { + InstanceType: "c6a.2xlarge", + VCPU: 8, + MemoryMb: 16384, + GPU: 0, + Architecture: "amd64", + }, + "c6a.32xlarge": { + InstanceType: "c6a.32xlarge", + VCPU: 128, + MemoryMb: 262144, + GPU: 0, + Architecture: "amd64", + }, + "c6a.48xlarge": { + InstanceType: "c6a.48xlarge", + VCPU: 192, + MemoryMb: 393216, + GPU: 0, + Architecture: "amd64", + }, + "c6a.4xlarge": { + InstanceType: "c6a.4xlarge", + VCPU: 16, + MemoryMb: 32768, + GPU: 0, + Architecture: "amd64", + }, + "c6a.8xlarge": { + InstanceType: "c6a.8xlarge", + VCPU: 32, + MemoryMb: 65536, + GPU: 0, + Architecture: "amd64", + }, + "c6a.large": { + InstanceType: "c6a.large", + VCPU: 2, + MemoryMb: 4096, + GPU: 0, + Architecture: "amd64", + }, + "c6a.xlarge": { + InstanceType: "c6a.xlarge", + VCPU: 4, + MemoryMb: 8192, + GPU: 0, + Architecture: "amd64", + }, "c6g.12xlarge": { InstanceType: "c6g.12xlarge", VCPU: 48, @@ -3098,6 +3168,13 @@ var InstanceTypes = map[string]*InstanceType{ GPU: 0, Architecture: "amd64", }, + "u-3tb1.56xlarge": { + InstanceType: "u-3tb1.56xlarge", + VCPU: 224, + MemoryMb: 3145728, + GPU: 0, + Architecture: "amd64", + }, "u-6tb1.112xlarge": { InstanceType: "u-6tb1.112xlarge", VCPU: 448, @@ -3259,6 +3336,48 @@ var InstanceTypes = map[string]*InstanceType{ GPU: 0, Architecture: "arm64", }, + "x2iezn.12xlarge": { + InstanceType: "x2iezn.12xlarge", + VCPU: 48, + MemoryMb: 1572864, + GPU: 0, + Architecture: "amd64", + }, + "x2iezn.2xlarge": { + InstanceType: "x2iezn.2xlarge", + VCPU: 8, + MemoryMb: 262144, + GPU: 0, + Architecture: "amd64", + }, + "x2iezn.4xlarge": { + InstanceType: "x2iezn.4xlarge", + VCPU: 16, + MemoryMb: 524288, + GPU: 0, + Architecture: "amd64", + }, + "x2iezn.6xlarge": { + InstanceType: "x2iezn.6xlarge", + VCPU: 24, + MemoryMb: 786432, + GPU: 0, + Architecture: "amd64", + }, + "x2iezn.8xlarge": { + InstanceType: "x2iezn.8xlarge", + VCPU: 32, + MemoryMb: 1048576, + GPU: 0, + Architecture: "amd64", + }, + "x2iezn.metal": { + InstanceType: "x2iezn.metal", + VCPU: 48, + MemoryMb: 1572864, + GPU: 0, + Architecture: "amd64", + }, "z1d.12xlarge": { InstanceType: "z1d.12xlarge", VCPU: 48, diff --git a/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool.go b/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool.go index acd98cf55bc9..ea65f8f23a7e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool.go @@ -32,6 +32,11 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) +const ( + aksManagedPoolNameTag = "aks-managed-poolName" + legacyAKSPoolNameTag = "poolName" +) + //AKSAgentPool implements NodeGroup interface for agent pool deployed in AKS type AKSAgentPool struct { azureRef @@ -315,7 +320,10 @@ func (agentPool *AKSAgentPool) DeleteNodes(nodes []*apiv1.Node) error { //IsAKSNode checks if the tag from the vm matches the agentPool name func (agentPool *AKSAgentPool) IsAKSNode(tags map[string]*string) bool { - poolName := tags["poolName"] + poolName := tags[aksManagedPoolNameTag] + if poolName == nil { + poolName = tags[legacyAKSPoolNameTag] + } if poolName != nil { klog.V(5).Infof("Matching agentPool name: %s with tag name: %s", agentPool.azureRef.Name, *poolName) if strings.EqualFold(*poolName, agentPool.azureRef.Name) { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_sercice_pool_test.go b/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool_test.go similarity index 96% rename from cluster-autoscaler/cloudprovider/azure/azure_kubernetes_sercice_pool_test.go rename to cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool_test.go index cb8f8796b09e..85487b07cf4f 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_sercice_pool_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_kubernetes_service_pool_test.go @@ -265,13 +265,17 @@ func TestAKSIncreaseSize(t *testing.T) { func TestIsAKSNode(t *testing.T) { aksPool := getTestAKSPool(newTestAzureManager(t), testAKSPoolName) - tags := map[string]*string{"poolName": to.StringPtr(testAKSPoolName)} + tags := map[string]*string{aksManagedPoolNameTag: to.StringPtr(testAKSPoolName)} isAKSNode := aksPool.IsAKSNode(tags) assert.True(t, isAKSNode) - tags = map[string]*string{"poolName": to.StringPtr("fake")} + tags = map[string]*string{aksManagedPoolNameTag: to.StringPtr("fake")} isAKSNode = aksPool.IsAKSNode(tags) assert.False(t, isAKSNode) + + tags = map[string]*string{legacyAKSPoolNameTag: to.StringPtr(testAKSPoolName)} + isAKSNode = aksPool.IsAKSNode(tags) + assert.True(t, isAKSNode) } func TestDeleteNodesAKS(t *testing.T) { @@ -346,7 +350,7 @@ func TestAKSNodes(t *testing.T) { { Name: to.StringPtr("name"), ID: to.StringPtr("/subscriptions/sub/resourceGroups/rg/providers/provider/vm1"), - Tags: map[string]*string{"poolName": to.StringPtr(testAKSPoolName)}, + Tags: map[string]*string{aksManagedPoolNameTag: to.StringPtr(testAKSPoolName)}, }, } @@ -394,7 +398,7 @@ func TestAKSDecreaseTargetSize(t *testing.T) { { Name: to.StringPtr("name"), ID: to.StringPtr("/subscriptions/sub/resourceGroups/rg/providers/provider/vm1"), - Tags: map[string]*string{"poolName": to.StringPtr(testAKSPoolName)}, + Tags: map[string]*string{aksManagedPoolNameTag: to.StringPtr(testAKSPoolName)}, }, } mockVMClient := mockvmclient.NewMockInterface(ctrl) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index dd7e0b0276e4..76940e1143c5 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -697,6 +697,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { maxSize: maxVal, manager: manager, curSize: 3, + sizeRefreshPeriod: manager.azureCache.refreshInterval, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index de4619d29e7f..f8b20ef8128c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -53,6 +53,9 @@ type ScaleSet struct { sizeMutex sync.Mutex curSize int64 + lastSizeRefresh time.Time + sizeRefreshPeriod time.Duration + instancesRefreshPeriod time.Duration instancesRefreshJitter int @@ -67,11 +70,11 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( azureRef: azureRef{ Name: spec.Name, }, - minSize: spec.MinSize, - maxSize: spec.MaxSize, - manager: az, - curSize: curSize, - + minSize: spec.MinSize, + maxSize: spec.MaxSize, + manager: az, + curSize: curSize, + sizeRefreshPeriod: az.azureCache.refreshInterval, instancesRefreshJitter: az.config.VmssVmsCacheJitter, } @@ -140,6 +143,11 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() + if scaleSet.lastSizeRefresh.Add(scaleSet.sizeRefreshPeriod).After(time.Now()) { + klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize) + return scaleSet.curSize, nil + } + set, err := scaleSet.getVMSSFromCache() if err != nil { klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err) @@ -148,7 +156,7 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { // If VMSS state is updating, return the currentSize which would've been proactively incremented or decremented by CA if set.VirtualMachineScaleSetProperties != nil && strings.EqualFold(to.String(set.VirtualMachineScaleSetProperties.ProvisioningState), string(compute.ProvisioningStateUpdating)) { - klog.V(3).Infof("VMSS %q is in updating state, returning cached size: %d", scaleSet.Name, scaleSet.curSize) + klog.V(3).Infof("VMSS %q is in updating state, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize) return scaleSet.curSize, nil } @@ -161,9 +169,10 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { klog.V(5).Infof("VMSS %q size changed from: %d to %d, invalidating instance cache", scaleSet.Name, scaleSet.curSize, curSize) scaleSet.invalidateInstanceCache() } - klog.V(3).Infof("VMSS: %s, previous size: %d, new size: %d", scaleSet.Name, scaleSet.curSize, curSize) + klog.V(3).Infof("VMSS: %s, in-memory size: %d, new size: %d", scaleSet.Name, scaleSet.curSize, curSize) scaleSet.curSize = curSize + scaleSet.lastSizeRefresh = time.Now() return scaleSet.curSize, nil } @@ -194,6 +203,7 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { if err != nil { klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) // Invalidate the VMSS size cache in order to fetch the size from the API. + scaleSet.invalidateLastSizeRefreshWithLock() scaleSet.manager.invalidateCache() } }() @@ -247,6 +257,7 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { // Proactively set the VMSS size so autoscaler makes better decisions. scaleSet.curSize = size + scaleSet.lastSizeRefresh = time.Now() go scaleSet.updateVMSSCapacity(future) return nil @@ -405,6 +416,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered if !hasUnregisteredNodes { scaleSet.sizeMutex.Lock() scaleSet.curSize -= int64(len(instanceIDs)) + scaleSet.lastSizeRefresh = time.Now() scaleSet.sizeMutex.Unlock() } @@ -567,6 +579,7 @@ func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, statu scaleSet.instanceCache[k].Status = &status } } + scaleSet.lastInstanceRefresh = time.Now() } // instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus @@ -594,3 +607,9 @@ func (scaleSet *ScaleSet) invalidateInstanceCache() { scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod) scaleSet.instanceMutex.Unlock() } + +func (scaleSet *ScaleSet) invalidateLastSizeRefreshWithLock() { + scaleSet.sizeMutex.Lock() + scaleSet.lastSizeRefresh = time.Now().Add(-1 * scaleSet.sizeRefreshPeriod) + scaleSet.sizeMutex.Unlock() +} diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 0843c289197a..0d60b6f2f356 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -71,6 +71,10 @@ type AutoscalingOptions struct { EstimatorName string // ExpanderNames sets the chain of node group expanders to be used in scale up ExpanderNames string + // GRPCExpanderCert is the location of the cert passed to the gRPC server for TLS when using the gRPC expander + GRPCExpanderCert string + // GRPCExpanderURL is the url of the gRPC server when using the gRPC expander + GRPCExpanderURL string // IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down IgnoreDaemonSetsUtilization bool // IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 6fed440793e9..2e2a9c886931 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -105,8 +105,8 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions) } if opts.ExpanderStrategy == nil { - expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), - opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace) + expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider, + opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL) if err != nil { return err } diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index c689b7f9e42e..fa1c5292e53b 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors { AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 854eef9e24e3..e052d2e3739c 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul } context.ExpanderStrategy = expander - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) @@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) @@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) @@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) diff --git a/cluster-autoscaler/expander/expander.go b/cluster-autoscaler/expander/expander.go index 7558bf428c49..57a91cfa78e7 100644 --- a/cluster-autoscaler/expander/expander.go +++ b/cluster-autoscaler/expander/expander.go @@ -24,7 +24,7 @@ import ( var ( // AvailableExpanders is a list of available expander options - AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName} + AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName, GRPCExpanderName} // RandomExpanderName selects a node group at random RandomExpanderName = "random" // MostPodsExpanderName selects a node group that fits the most pods @@ -36,6 +36,8 @@ var ( PriceBasedExpanderName = "price" // PriorityBasedExpanderName selects a node group based on a user-configured priorities assigned to group names PriorityBasedExpanderName = "priority" + // GRPCExpanderName uses the gRPC client expander to call to an external gRPC server to select a node group for scale up + GRPCExpanderName = "grpc" ) // Option describes an option to expand the cluster. diff --git a/cluster-autoscaler/expander/factory/expander_factory.go b/cluster-autoscaler/expander/factory/expander_factory.go index 485928032cdb..a0e0b7fe0d5a 100644 --- a/cluster-autoscaler/expander/factory/expander_factory.go +++ b/cluster-autoscaler/expander/factory/expander_factory.go @@ -20,6 +20,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin" "k8s.io/autoscaler/cluster-autoscaler/expander/mostpods" "k8s.io/autoscaler/cluster-autoscaler/expander/price" "k8s.io/autoscaler/cluster-autoscaler/expander/priority" @@ -27,14 +28,14 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander/waste" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - kube_client "k8s.io/client-go/kubernetes" ) // ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in +// take in whole opts and access stuff here func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider, autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface, - configNamespace string) (expander.Strategy, errors.AutoscalerError) { + configNamespace string, GRPCExpanderCert string, GRPCExpanderURL string) (expander.Strategy, errors.AutoscalerError) { var filters []expander.Filter seenExpanders := map[string]struct{}{} strategySeen := false @@ -67,6 +68,8 @@ func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprov stopChannel := make(chan struct{}) lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace) filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)) + case expander.GRPCExpanderName: + filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL)) default: return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag) } diff --git a/cluster-autoscaler/expander/grpcplugin/README.md b/cluster-autoscaler/expander/grpcplugin/README.md new file mode 100644 index 000000000000..4fd24408fd21 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/README.md @@ -0,0 +1,41 @@ +# gRPC Expander for Cluster Autoscaler + +## Introduction +This expander functions as a gRPC client, and will pass expansion options to an external gRPC server. +The external server will use this information to make a decision on which Node Group to expand, and return an option to expand. + +## Motivation + +This expander gives users very fine grained control over which option they'd like to expand. +The gRPC server must be implemented by the user, but the logic can be developed out of band with Cluster Autoscaler. +There are a wide variety of use cases here. Some examples are as follows: +* A tiered weighted random strategy can be implemented, instead of a static priority ladder offered by the priority expander. +* A strategy to encapsulate business logic specific to a user but not all users of Cluster Autoscaler +* A strategy to take into account the dynamic fluctuating prices of the spot instance market + +## Configuration options +As using this expander requires communication with another service, users must specify a few options as CLI arguments. + +```yaml +--grpcExpanderUrl +``` +URL of the gRPC Expander server, for CA to communicate with. +```yaml +--grpcExpanderCert +``` +Location of the volume mounted certificate of the gRPC server if it is configured to communicate over TLS + +## gRPC Expander Server Setup +The gRPC server can be set up in many ways, but a simple example is described below. +An example of a barebones gRPC Exapnder Server can be found in the `example` directory under `fake_grpc_server.go` file. This is meant to be copied elsewhere and deployed as a separate +service. Note that the `protos/expander.pb.go` generated protobuf code will also need to be copied and used to serialize/deserizle the Options passed from CA. +Communication between Cluster Autoscaler and the gRPC Server will occur over native kube-proxy. To use this, note the Service and Namespace the gRPC server is deployed in. + +Deploy the gRPC Expander Server as a separate app, listening on a specifc port number. +Start Cluster Autoscaler with the `--grpcExapnderURl=SERVICE_NAME.NAMESPACE_NAME.svc.cluster.local:PORT_NUMBER` flag, as well as `--grpcExpanderCert` pointed at the location of the volume mounted certificate of the gRPC server. + +## Details + +The gRPC client currently transforms nodeInfo objects passed into the expander to v1.Node objects to save rpc call throughput. As such, the gRPC server will not have access to daemonsets and static pods running on each node. + + diff --git a/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go new file mode 100644 index 000000000000..05fd5f47f06b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go @@ -0,0 +1,104 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "context" + "fmt" + "log" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" +) + +// This code is meant to be used as starter code, deployed as a separate app, not in Cluster Autoscaler. +// This serves as the gRPC Expander Server counterpart to the client which lives in this repo +// main.go of said application should simply pass in paths to (optional)cert, (optional)private key, and port, and call Serve to start listening +// copy the protos/expander.pb.go to your other application's repo, so it has access to the protobuf definitions + +// Serve should be called by the main() function in main.go of the Expander Server repo to start serving +func Serve(certPath string, keyPath string, port uint) { + + var grpcServer *grpc.Server + + // If credentials are passed in, use them + if certPath != "" && keyPath != "" { + log.Printf("Using certFile: %v and keyFile: %v", certPath, keyPath) + tlsCredentials, err := credentials.NewServerTLSFromFile(certPath, keyPath) + if err != nil { + log.Fatal("cannot load TLS credentials: ", err) + } + grpcServer = grpc.NewServer(grpc.Creds(tlsCredentials)) + } else { + grpcServer = grpc.NewServer() + } + + netListener := getNetListener(port) + + expanderServerImpl := NewExpanderServerImpl() + + protos.RegisterExpanderServer(grpcServer, expanderServerImpl) + + // start the server + log.Println("Starting server on port ", port) + if err := grpcServer.Serve(netListener); err != nil { + log.Fatalf("failed to serve: %s", err) + } +} + +func getNetListener(port uint) net.Listener { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + panic(fmt.Sprintf("failed to listen: %v", err)) + } + + return lis +} + +// ExpanderServerImpl is an implementation of Expander Server from proto definition +type ExpanderServerImpl struct{} + +// NewExpanderServerImpl is this Expander's implementation of the server +func NewExpanderServerImpl() *ExpanderServerImpl { + return &ExpanderServerImpl{} +} + +// BestOptions method filters out the best options of all options passed from the gRPC Client in CA, according to the defined strategy. +func (ServerImpl *ExpanderServerImpl) BestOptions(ctx context.Context, req *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) { + opts := req.GetOptions() + log.Printf("Received BestOption Request with %v options", len(opts)) + + // This strategy simply chooses the Option with the longest NodeGroupID name, but can be replaced with any arbitrary logic + longest := 0 + var choice *protos.Option + for _, opt := range opts { + log.Println(opt.NodeGroupId) + if len(opt.NodeGroupId) > longest { + choice = opt + } + } + + log.Print("returned bestOptions with option: ", choice.NodeGroupId) + + // Return just one option for now + return &protos.BestOptionsResponse{ + Options: []*protos.Option{choice}, + }, nil +} diff --git a/cluster-autoscaler/expander/grpcplugin/example/main.go b/cluster-autoscaler/expander/grpcplugin/example/main.go new file mode 100644 index 000000000000..5401416baa01 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/main.go @@ -0,0 +1,30 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import "flag" + +func main() { + + certPath := flag.String("cert-path", "", "Path to cert file for gRPC Expander Server") + keyPath := flag.String("key-path", "", "Path to private key for gRPC Expander Server") + port := flag.Uint("port", 7000, "Port number for server to listen on") + + flag.Parse() + + Serve(*certPath, *keyPath, *port) +} diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client.go b/cluster-autoscaler/expander/grpcplugin/grpc_client.go new file mode 100644 index 000000000000..7800bd270136 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client.go @@ -0,0 +1,143 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcplugin + +import ( + "context" + "log" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const gRPCTimeout = 5 * time.Second + +type grpcclientstrategy struct { + grpcClient protos.ExpanderClient +} + +// NewFilter returns an expansion filter that creates a gRPC client, and calls out to a gRPC server +func NewFilter(expanderCert string, expanderUrl string) expander.Filter { + client := createGRPCClient(expanderCert, expanderUrl) + if client == nil { + return &grpcclientstrategy{grpcClient: nil} + } + return &grpcclientstrategy{grpcClient: client} +} + +func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderClient { + var dialOpt grpc.DialOption + + if expanderCert == "" { + log.Fatalf("GRPC Expander Cert not specified, insecure connections not allowed") + return nil + } + creds, err := credentials.NewClientTLSFromFile(expanderCert, "") + if err != nil { + log.Fatalf("Failed to create TLS credentials %v", err) + return nil + } + dialOpt = grpc.WithTransportCredentials(creds) + klog.V(2).Infof("Dialing: %s with dialopt: %v", expanderUrl, dialOpt) + conn, err := grpc.Dial(expanderUrl, dialOpt) + if err != nil { + log.Fatalf("Fail to dial server: %v", err) + return nil + } + return protos.NewExpanderClient(conn) +} + +func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option { + if g.grpcClient == nil { + klog.Errorf("Incorrect gRPC client config, filtering no options") + return expansionOptions + } + + // Transform inputs to gRPC inputs + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(expansionOptions) + grpcNodeMap := populateNodeInfoForGRPC(nodeInfo) + + // call gRPC server to get BestOption + klog.V(2).Infof("GPRC call of best options to server with %v options", len(nodeGroupIDOptionMap)) + ctx, cancel := context.WithTimeout(context.Background(), gRPCTimeout) + defer cancel() + bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap}) + if err != nil { + klog.V(4).Info("GRPC call timed out, no options filtered") + return expansionOptions + } + + if bestOptionsResponse == nil || bestOptionsResponse.Options == nil { + klog.V(4).Info("GRPC returned nil bestOptions, no options filtered") + return expansionOptions + } + // Transform back options slice + options := transformAndSanitizeOptionsFromGRPC(bestOptionsResponse.Options, nodeGroupIDOptionMap) + if options == nil { + klog.V(4).Info("Unable to sanitize GPRC returned bestOptions, no options filtered") + return expansionOptions + } + return options +} + +// populateOptionsForGRPC creates a map of nodegroup ID and options, as well as a slice of Options objects for the gRPC call +func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Option, map[string]expander.Option) { + grpcOptionsSlice := []*protos.Option{} + nodeGroupIDOptionMap := make(map[string]expander.Option) + for _, option := range expansionOptions { + nodeGroupIDOptionMap[option.NodeGroup.Id()] = option + grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods)) + } + return grpcOptionsSlice, nodeGroupIDOptionMap +} + +// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc +func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node { + grpcNodeInfoMap := make(map[string]*v1.Node) + for nodeId, nodeInfo := range nodeInfos { + grpcNodeInfoMap[nodeId] = nodeInfo.Node() + } + return grpcNodeInfoMap +} + +func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option { + var options []expander.Option + for _, option := range bestOptionsResponseOptions { + if option == nil { + klog.Errorf("GRPC server returned nil Option") + continue + } + if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok { + options = append(options, nodeGroupIDOptionMap[option.NodeGroupId]) + } else { + klog.Errorf("GRPC server returned invalid nodeGroup ID: ", option.NodeGroupId) + continue + } + } + return options +} + +func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option { + return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods} +} diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go new file mode 100644 index 000000000000..65b94a17d54b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcplugin + +import ( + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" + "k8s.io/autoscaler/cluster-autoscaler/expander/mocks" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/expander" + + _ "github.com/golang/mock/mockgen/model" +) + +var ( + nodes = []*v1.Node{ + BuildTestNode("n1", 1000, 1000), + BuildTestNode("n2", 1000, 1000), + BuildTestNode("n3", 1000, 1000), + BuildTestNode("n4", 1000, 1000), + } + + eoT2Micro = expander.Option{ + Debug: "t2.micro", + NodeGroup: test.NewTestNodeGroup("my-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil), + } + eoT2Large = expander.Option{ + Debug: "t2.large", + NodeGroup: test.NewTestNodeGroup("my-asg.t2.large", 10, 1, 1, true, false, "t2.large", nil, nil), + } + eoT3Large = expander.Option{ + Debug: "t3.large", + NodeGroup: test.NewTestNodeGroup("my-asg.t3.large", 10, 1, 1, true, false, "t3.large", nil, nil), + } + eoM44XLarge = expander.Option{ + Debug: "m4.4xlarge", + NodeGroup: test.NewTestNodeGroup("my-asg.m4.4xlarge", 10, 1, 1, true, false, "m4.4xlarge", nil, nil), + } + options = []expander.Option{eoT2Micro, eoT2Large, eoT3Large, eoM44XLarge} + + grpcEoT2Micro = protos.Option{ + NodeGroupId: eoT2Micro.NodeGroup.Id(), + NodeCount: int32(eoT2Micro.NodeCount), + Debug: eoT2Micro.Debug, + Pod: eoT2Micro.Pods, + } + grpcEoT2Large = protos.Option{ + NodeGroupId: eoT2Large.NodeGroup.Id(), + NodeCount: int32(eoT2Large.NodeCount), + Debug: eoT2Large.Debug, + Pod: eoT2Large.Pods, + } + grpcEoT3Large = protos.Option{ + NodeGroupId: eoT3Large.NodeGroup.Id(), + NodeCount: int32(eoT3Large.NodeCount), + Debug: eoT3Large.Debug, + Pod: eoT3Large.Pods, + } + grpcEoM44XLarge = protos.Option{ + NodeGroupId: eoM44XLarge.NodeGroup.Id(), + NodeCount: int32(eoM44XLarge.NodeCount), + Debug: eoM44XLarge.Debug, + Pod: eoM44XLarge.Pods, + } +) + +func TestPopulateOptionsForGrpc(t *testing.T) { + testCases := []struct { + desc string + opts []expander.Option + expectedOpts []*protos.Option + expectedMap map[string]expander.Option + }{ + { + desc: "empty options", + opts: []expander.Option{}, + expectedOpts: []*protos.Option{}, + expectedMap: map[string]expander.Option{}, + }, + { + desc: "one option", + opts: []expander.Option{eoT2Micro}, + expectedOpts: []*protos.Option{&grpcEoT2Micro}, + expectedMap: map[string]expander.Option{eoT2Micro.NodeGroup.Id(): eoT2Micro}, + }, + { + desc: "many options", + opts: options, + expectedOpts: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + expectedMap: map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + }, + }, + } + for _, tc := range testCases { + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts) + assert.Equal(t, tc.expectedOpts, grpcOptionsSlice) + assert.Equal(t, tc.expectedMap, nodeGroupIDOptionMap) + } +} + +func makeFakeNodeInfos() map[string]*schedulerframework.NodeInfo { + nodeInfos := make(map[string]*schedulerframework.NodeInfo) + for i, opt := range options { + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(nodes[i]) + nodeInfos[opt.NodeGroup.Id()] = nodeInfo + } + return nodeInfos +} + +func TestPopulateNodeInfoForGRPC(t *testing.T) { + nodeInfos := makeFakeNodeInfos() + grpcNodeInfoMap := populateNodeInfoForGRPC(nodeInfos) + + expectedGrpcNodeInfoMap := make(map[string]*v1.Node) + for i, opt := range options { + expectedGrpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i] + } + assert.Equal(t, expectedGrpcNodeInfoMap, grpcNodeInfoMap) +} + +func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { + responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge} + nodeGroupIDOptionMap := map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + } + + expectedOptions := []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge} + + ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap) + assert.Equal(t, expectedOptions, ret) +} + +func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { + responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge} + nodeGroupIDOptionMap := map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + } + + ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap) + assert.Equal(t, []expander.Option{eoT2Micro, eoT3Large}, ret) +} + +func TestBestOptionsValid(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockExpanderClient(ctrl) + g := &grpcclientstrategy{mockClient} + + nodeInfos := makeFakeNodeInfos() + grpcNodeInfoMap := make(map[string]*v1.Node) + for i, opt := range options { + grpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i] + } + expectedBestOptionsReq := &protos.BestOptionsRequest{ + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, + } + + mockClient.EXPECT().BestOptions( + gomock.Any(), gomock.Eq(expectedBestOptionsReq), + ).Return(&protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT3Large}}, nil) + + resp := g.BestOptions(options, nodeInfos) + + assert.Equal(t, resp, []expander.Option{eoT3Large}) +} + +// All test cases should error, and no options should be filtered +func TestBestOptionsErrors(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := mocks.NewMockExpanderClient(ctrl) + g := grpcclientstrategy{mockClient} + + badProtosOption := protos.Option{ + NodeGroupId: "badID", + NodeCount: int32(eoM44XLarge.NodeCount), + Debug: eoM44XLarge.Debug, + Pod: eoM44XLarge.Pods, + } + + testCases := []struct { + desc string + client grpcclientstrategy + nodeInfo map[string]*schedulerframework.NodeInfo + mockResponse protos.BestOptionsResponse + errResponse error + }{ + { + desc: "Bad gRPC client config", + client: grpcclientstrategy{nil}, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: nil, + }, + { + desc: "gRPC error response", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: errors.New("timeout error"), + }, + { + desc: "bad bestOptions response", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options nil", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: nil}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options invalid - nil", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, nil, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}}, + errResponse: nil, + }, + { + desc: "bad bestOptions response, options invalid - nonExistent nodeID", + client: g, + nodeInfo: makeFakeNodeInfos(), + mockResponse: protos.BestOptionsResponse{Options: []*protos.Option{&grpcEoT2Micro, &badProtosOption, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}}, + errResponse: nil, + }, + } + for _, tc := range testCases { + grpcNodeInfoMap := populateNodeInfoForGRPC(tc.nodeInfo) + mockClient.EXPECT().BestOptions( + gomock.Any(), gomock.Eq( + &protos.BestOptionsRequest{ + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, + })).Return(&tc.mockResponse, tc.errResponse) + resp := g.BestOptions(options, tc.nodeInfo) + + assert.Equal(t, resp, options) + } +} diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go new file mode 100644 index 000000000000..3c071d2f37d7 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go @@ -0,0 +1,440 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protos + +import ( + context "context" + reflect "reflect" + sync "sync" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + v1 "k8s.io/api/core/v1" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type BestOptionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"` + // key is node id from options + NodeMap map[string]*v1.Node `protobuf:"bytes,2,rep,name=nodeMap,proto3" json:"nodeMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *BestOptionsRequest) Reset() { + *x = BestOptionsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BestOptionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BestOptionsRequest) ProtoMessage() {} + +func (x *BestOptionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BestOptionsRequest.ProtoReflect.Descriptor instead. +func (*BestOptionsRequest) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{0} +} + +func (x *BestOptionsRequest) GetOptions() []*Option { + if x != nil { + return x.Options + } + return nil +} + +func (x *BestOptionsRequest) GetNodeMap() map[string]*v1.Node { + if x != nil { + return x.NodeMap + } + return nil +} + +type BestOptionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"` +} + +func (x *BestOptionsResponse) Reset() { + *x = BestOptionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BestOptionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BestOptionsResponse) ProtoMessage() {} + +func (x *BestOptionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BestOptionsResponse.ProtoReflect.Descriptor instead. +func (*BestOptionsResponse) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{1} +} + +func (x *BestOptionsResponse) GetOptions() []*Option { + if x != nil { + return x.Options + } + return nil +} + +type Option struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map. + NodeGroupId string `protobuf:"bytes,1,opt,name=nodeGroupId,proto3" json:"nodeGroupId,omitempty"` + NodeCount int32 `protobuf:"varint,2,opt,name=nodeCount,proto3" json:"nodeCount,omitempty"` + Debug string `protobuf:"bytes,3,opt,name=debug,proto3" json:"debug,omitempty"` + Pod []*v1.Pod `protobuf:"bytes,4,rep,name=pod,proto3" json:"pod,omitempty"` +} + +func (x *Option) Reset() { + *x = Option{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Option) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Option) ProtoMessage() {} + +func (x *Option) ProtoReflect() protoreflect.Message { + mi := &file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Option.ProtoReflect.Descriptor instead. +func (*Option) Descriptor() ([]byte, []int) { + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP(), []int{2} +} + +func (x *Option) GetNodeGroupId() string { + if x != nil { + return x.NodeGroupId + } + return "" +} + +func (x *Option) GetNodeCount() int32 { + if x != nil { + return x.NodeCount + } + return 0 +} + +func (x *Option) GetDebug() string { + if x != nil { + return x.Debug + } + return "" +} + +func (x *Option) GetPod() []*v1.Pod { + if x != nil { + return x.Pod + } + return nil +} + +var File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto protoreflect.FileDescriptor + +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = []byte{ + 0x0a, 0x3c, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, + 0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, + 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, + 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x1a, 0x22, 0x6b, 0x38, 0x73, 0x2e, + 0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x67, + 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdf, + 0x01, 0x0a, 0x12, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, + 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x1a, 0x54, 0x0a, 0x0c, 0x4e, 0x6f, + 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6b, 0x38, + 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0x43, 0x0a, 0x13, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x89, 0x01, 0x0a, 0x06, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, + 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x12, 0x29, 0x0a, 0x03, 0x70, 0x6f, 0x64, 0x18, 0x04, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, + 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x64, 0x52, 0x03, 0x70, 0x6f, + 0x64, 0x32, 0x5c, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x50, 0x0a, + 0x0b, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, + 0x2f, 0x5a, 0x2d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2d, 0x61, 0x75, 0x74, 0x6f, 0x73, + 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2f, 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce sync.Once + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc +) + +func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescGZIP() []byte { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescOnce.Do(func() { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData = protoimpl.X.CompressGZIP(file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData) + }) + return file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDescData +} + +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = []interface{}{ + (*BestOptionsRequest)(nil), // 0: grpcplugin.BestOptionsRequest + (*BestOptionsResponse)(nil), // 1: grpcplugin.BestOptionsResponse + (*Option)(nil), // 2: grpcplugin.Option + nil, // 3: grpcplugin.BestOptionsRequest.NodeMapEntry + (*v1.Pod)(nil), // 4: k8s.io.api.core.v1.Pod + (*v1.Node)(nil), // 5: k8s.io.api.core.v1.Node +} +var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = []int32{ + 2, // 0: grpcplugin.BestOptionsRequest.options:type_name -> grpcplugin.Option + 3, // 1: grpcplugin.BestOptionsRequest.nodeMap:type_name -> grpcplugin.BestOptionsRequest.NodeMapEntry + 2, // 2: grpcplugin.BestOptionsResponse.options:type_name -> grpcplugin.Option + 4, // 3: grpcplugin.Option.pod:type_name -> k8s.io.api.core.v1.Pod + 5, // 4: grpcplugin.BestOptionsRequest.NodeMapEntry.value:type_name -> k8s.io.api.core.v1.Node + 0, // 5: grpcplugin.Expander.BestOptions:input_type -> grpcplugin.BestOptionsRequest + 1, // 6: grpcplugin.Expander.BestOptions:output_type -> grpcplugin.BestOptionsResponse + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() } +func file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_init() { + if File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BestOptionsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BestOptionsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Option); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes, + DependencyIndexes: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs, + MessageInfos: file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_msgTypes, + }.Build() + File_cluster_autoscaler_expander_grpcplugin_protos_expander_proto = out.File + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = nil + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = nil + file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = nil +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// ExpanderClient is the client API for Expander service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ExpanderClient interface { + BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) +} + +type expanderClient struct { + cc grpc.ClientConnInterface +} + +func NewExpanderClient(cc grpc.ClientConnInterface) ExpanderClient { + return &expanderClient{cc} +} + +func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) { + out := new(BestOptionsResponse) + err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ExpanderServer is the server API for Expander service. +type ExpanderServer interface { + BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error) +} + +// UnimplementedExpanderServer can be embedded to have forward compatible implementations. +type UnimplementedExpanderServer struct { +} + +func (*UnimplementedExpanderServer) BestOptions(context.Context, *BestOptionsRequest) (*BestOptionsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method BestOptions not implemented") +} + +func RegisterExpanderServer(s *grpc.Server, srv ExpanderServer) { + s.RegisterService(&_Expander_serviceDesc, srv) +} + +func _Expander_BestOptions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BestOptionsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExpanderServer).BestOptions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpcplugin.Expander/BestOptions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExpanderServer).BestOptions(ctx, req.(*BestOptionsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Expander_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpcplugin.Expander", + HandlerType: (*ExpanderServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "BestOptions", + Handler: _Expander_BestOptions_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "cluster-autoscaler/expander/grpcplugin/protos/expander.proto", +} diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.proto b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto new file mode 100644 index 000000000000..5a08e8ff301b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package grpcplugin; +import "k8s.io/api/core/v1/generated.proto"; +option go_package = "cluster-autoscaler/expander/grpcplugin/protos"; + + + +// Interface for Expander +service Expander { + + rpc BestOptions (BestOptionsRequest) + returns (BestOptionsResponse) {} +} + +message BestOptionsRequest { + repeated Option options = 1; + // key is node id from options + mapnodeMap = 2; +} +message BestOptionsResponse { + repeated Option options = 1; +} +message Option { + // only need the ID of node to uniquely identify the nodeGroup, used in the nodeInfo map. + string nodeGroupId = 1; + int32 nodeCount = 2; + string debug = 3; + repeated k8s.io.api.core.v1.Pod pod = 4; +} diff --git a/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go b/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go new file mode 100644 index 000000000000..358fcfcc857e --- /dev/null +++ b/cluster-autoscaler/expander/mocks/GRPCPluginExpander.go @@ -0,0 +1,107 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" +) + +// MockExpanderClient is a mock of ExpanderClient interface. +type MockExpanderClient struct { + ctrl *gomock.Controller + recorder *MockExpanderClientMockRecorder +} + +// MockExpanderClientMockRecorder is the mock recorder for MockExpanderClient. +type MockExpanderClientMockRecorder struct { + mock *MockExpanderClient +} + +// NewMockExpanderClient creates a new mock instance. +func NewMockExpanderClient(ctrl *gomock.Controller) *MockExpanderClient { + mock := &MockExpanderClient{ctrl: ctrl} + mock.recorder = &MockExpanderClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExpanderClient) EXPECT() *MockExpanderClientMockRecorder { + return m.recorder +} + +// BestOptions mocks base method. +func (m *MockExpanderClient) BestOptions(ctx context.Context, in *protos.BestOptionsRequest, opts ...grpc.CallOption) (*protos.BestOptionsResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BestOptions", varargs...) + ret0, _ := ret[0].(*protos.BestOptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BestOptions indicates an expected call of BestOptions. +func (mr *MockExpanderClientMockRecorder) BestOptions(ctx, in interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderClient)(nil).BestOptions), varargs...) +} + +// MockExpanderServer is a mock of ExpanderServer interface. +type MockExpanderServer struct { + ctrl *gomock.Controller + recorder *MockExpanderServerMockRecorder +} + +// MockExpanderServerMockRecorder is the mock recorder for MockExpanderServer. +type MockExpanderServerMockRecorder struct { + mock *MockExpanderServer +} + +// NewMockExpanderServer creates a new mock instance. +func NewMockExpanderServer(ctrl *gomock.Controller) *MockExpanderServer { + mock := &MockExpanderServer{ctrl: ctrl} + mock.recorder = &MockExpanderServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExpanderServer) EXPECT() *MockExpanderServerMockRecorder { + return m.recorder +} + +// BestOptions mocks base method. +func (m *MockExpanderServer) BestOptions(arg0 context.Context, arg1 *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BestOptions", arg0, arg1) + ret0, _ := ret[0].(*protos.BestOptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BestOptions indicates an expected call of BestOptions. +func (mr *MockExpanderServerMockRecorder) BestOptions(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BestOptions", reflect.TypeOf((*MockExpanderServer)(nil).BestOptions), arg0, arg1) +} diff --git a/cluster-autoscaler/go.mod b/cluster-autoscaler/go.mod index 938f21caf4d6..c93bd85021d8 100644 --- a/cluster-autoscaler/go.mod +++ b/cluster-autoscaler/go.mod @@ -26,6 +26,8 @@ require ( golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f google.golang.org/api v0.46.0 + google.golang.org/grpc v1.40.0 + google.golang.org/protobuf v1.27.1 gopkg.in/gcfg.v1 v1.2.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.24.0-alpha.2 diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 96443b952f4d..2636bda44d1c 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -46,6 +46,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -154,6 +155,9 @@ var ( expanderFlag = flag.String("expander", expander.RandomExpanderName, "Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]. Specifying multiple values separated by commas will call the expanders in succession until there is only one option remaining. Ties still existing after this process are broken randomly.") + grpcExpanderCert = flag.String("grpc-expander-cert", "", "Path to cert used by gRPC server over TLS") + grpcExpanderURL = flag.String("grpc-expander-url", "", "URL to reach gRPC expander server.") + ignoreDaemonSetsUtilization = flag.Bool("ignore-daemonsets-utilization", false, "Should CA ignore DaemonSet pods when calculating resource utilization for scaling down") ignoreMirrorPodsUtilization = flag.Bool("ignore-mirror-pods-utilization", false, @@ -185,6 +189,7 @@ var ( emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled") + nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", 87600*time.Hour, "Node Info cache expire time for each item. Default value is 10 years.") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -219,6 +224,8 @@ func createAutoscalingOptions() config.AutoscalingOptions { ScaleUpFromZero: *scaleUpFromZero, EstimatorName: *estimatorFlag, ExpanderNames: *expanderFlag, + GRPCExpanderCert: *grpcExpanderCert, + GRPCExpanderURL: *grpcExpanderURL, IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization, IgnoreMirrorPodsUtilization: *ignoreMirrorPodsUtilization, MaxBulkSoftTaintCount: *maxBulkSoftTaintCount, @@ -322,6 +329,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter } opts.Processors = ca_processors.DefaultProcessors() + opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime) opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor() nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go index 37ac80fd0b8b..78ff16f3cc90 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go @@ -35,20 +35,36 @@ import ( ) const stabilizationDelay = 1 * time.Minute +const maxCacheExpireTime = 87660 * time.Hour + +type cacheItem struct { + *schedulerframework.NodeInfo + added time.Time +} // MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups. type MixedTemplateNodeInfoProvider struct { - nodeInfoCache map[string]*schedulerframework.NodeInfo + nodeInfoCache map[string]cacheItem + ttl time.Duration } // NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building // NodeInfos from real-world nodes when available, otherwise from node groups templates. -func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider { +func NewMixedTemplateNodeInfoProvider(t *time.Duration) *MixedTemplateNodeInfoProvider { + ttl := maxCacheExpireTime + if t != nil { + ttl = *t + } return &MixedTemplateNodeInfoProvider{ - nodeInfoCache: make(map[string]*schedulerframework.NodeInfo), + nodeInfoCache: make(map[string]cacheItem), + ttl: ttl, } } +func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool { + return time.Now().Sub(added) > p.ttl +} + // CleanUp cleans up processor's internal structures. func (p *MixedTemplateNodeInfoProvider) CleanUp() { } @@ -102,7 +118,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, } if added && p.nodeInfoCache != nil { if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil { - p.nodeInfoCache[id] = nodeInfoCopy + p.nodeInfoCache[id] = cacheItem{NodeInfo: nodeInfoCopy, added: time.Now()} } } } @@ -115,8 +131,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, // No good template, check cache of previously running nodes. if p.nodeInfoCache != nil { - if nodeInfo, found := p.nodeInfoCache[id]; found { - if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil { + if cacheItem, found := p.nodeInfoCache[id]; found { + if p.isCacheItemExpired(cacheItem.added) { + delete(p.nodeInfoCache, id) + } else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.NodeInfo); err == nil { result[id] = nodeInfoCopy continue } diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go index 77e8f58fa45a..bd9fd7e6e021 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go @@ -32,6 +32,10 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) +var ( + cacheTtl = 1 * time.Second +) + func TestGetNodeInfosForGroups(t *testing.T) { now := time.Now() ready1 := BuildTestNode("n1", 1000, 1000) @@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) + res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) assert.Equal(t, 5, len(res)) info, found := res["ng1"] @@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now) + res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) assert.Equal(t, 0, len(res)) } @@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { ListerRegistry: registry, }, } - niProcessor := NewMixedTemplateNodeInfoProvider() + niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl) res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) // Check results @@ -223,7 +227,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { // Fill cache manually infoNg4Node6 := schedulerframework.NewNodeInfo() infoNg4Node6.SetNode(ready6.DeepCopy()) - niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6} + niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}} res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) // Check if cache was used assert.NoError(t, err) @@ -236,6 +240,55 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { assertEqualNodeCapacities(t, ready6, info.Node()) } +func TestGetNodeInfosCacheExpired(t *testing.T) { + now := time.Now() + ready1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(ready1, true, now.Add(-2*time.Minute)) + + // Cloud provider with TemplateNodeInfo not implemented. + provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + predicateChecker, err := simulator.NewTestPredicateChecker() + assert.NoError(t, err) + + ctx := context.AutoscalingContext{ + CloudProvider: provider, + PredicateChecker: predicateChecker, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: registry, + }, + } + tn := BuildTestNode("tn", 5000, 5000) + tni := schedulerframework.NewNodeInfo() + tni.SetNode(tn) + // Cache expire time is set. + niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl) + niProcessor1.nodeInfoCache = map[string]cacheItem{ + "ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + "ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + } + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", ready1) + + assert.Equal(t, 2, len(niProcessor1.nodeInfoCache)) + _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 1, len(niProcessor1.nodeInfoCache)) + + // Cache expire time isn't set. + niProcessor2 := NewMixedTemplateNodeInfoProvider(nil) + niProcessor2.nodeInfoCache = map[string]cacheItem{ + "ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + "ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + } + assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) + _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) + +} + func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) { t.Helper() assert.NotEqual(t, actual.Status, nil, "") diff --git a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go index 74f31815758e..6136cfd50c94 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go @@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface { } // NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider. -func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider { - return NewMixedTemplateNodeInfoProvider() +func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider { + return NewMixedTemplateNodeInfoProvider(time) } diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 36e204f5101f..f04b6ffc7464 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -77,8 +77,8 @@ func DefaultProcessors() *AutoscalingProcessors { NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), } } diff --git a/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go new file mode 100644 index 000000000000..2c6a62ceb268 --- /dev/null +++ b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go @@ -0,0 +1,495 @@ +// Copyright 2012 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package model contains the data model necessary for generating mock implementations. +package model + +import ( + "encoding/gob" + "fmt" + "io" + "reflect" + "strings" +) + +// pkgPath is the importable path for package model +const pkgPath = "github.com/golang/mock/mockgen/model" + +// Package is a Go package. It may be a subset. +type Package struct { + Name string + PkgPath string + Interfaces []*Interface + DotImports []string +} + +// Print writes the package name and its exported interfaces. +func (pkg *Package) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, "package %s\n", pkg.Name) + for _, intf := range pkg.Interfaces { + intf.Print(w) + } +} + +// Imports returns the imports needed by the Package as a set of import paths. +func (pkg *Package) Imports() map[string]bool { + im := make(map[string]bool) + for _, intf := range pkg.Interfaces { + intf.addImports(im) + } + return im +} + +// Interface is a Go interface. +type Interface struct { + Name string + Methods []*Method +} + +// Print writes the interface name and its methods. +func (intf *Interface) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, "interface %s\n", intf.Name) + for _, m := range intf.Methods { + m.Print(w) + } +} + +func (intf *Interface) addImports(im map[string]bool) { + for _, m := range intf.Methods { + m.addImports(im) + } +} + +// AddMethod adds a new method, de-duplicating by method name. +func (intf *Interface) AddMethod(m *Method) { + for _, me := range intf.Methods { + if me.Name == m.Name { + return + } + } + intf.Methods = append(intf.Methods, m) +} + +// Method is a single method of an interface. +type Method struct { + Name string + In, Out []*Parameter + Variadic *Parameter // may be nil +} + +// Print writes the method name and its signature. +func (m *Method) Print(w io.Writer) { + _, _ = fmt.Fprintf(w, " - method %s\n", m.Name) + if len(m.In) > 0 { + _, _ = fmt.Fprintf(w, " in:\n") + for _, p := range m.In { + p.Print(w) + } + } + if m.Variadic != nil { + _, _ = fmt.Fprintf(w, " ...:\n") + m.Variadic.Print(w) + } + if len(m.Out) > 0 { + _, _ = fmt.Fprintf(w, " out:\n") + for _, p := range m.Out { + p.Print(w) + } + } +} + +func (m *Method) addImports(im map[string]bool) { + for _, p := range m.In { + p.Type.addImports(im) + } + if m.Variadic != nil { + m.Variadic.Type.addImports(im) + } + for _, p := range m.Out { + p.Type.addImports(im) + } +} + +// Parameter is an argument or return parameter of a method. +type Parameter struct { + Name string // may be empty + Type Type +} + +// Print writes a method parameter. +func (p *Parameter) Print(w io.Writer) { + n := p.Name + if n == "" { + n = `""` + } + _, _ = fmt.Fprintf(w, " - %v: %v\n", n, p.Type.String(nil, "")) +} + +// Type is a Go type. +type Type interface { + String(pm map[string]string, pkgOverride string) string + addImports(im map[string]bool) +} + +func init() { + gob.Register(&ArrayType{}) + gob.Register(&ChanType{}) + gob.Register(&FuncType{}) + gob.Register(&MapType{}) + gob.Register(&NamedType{}) + gob.Register(&PointerType{}) + + // Call gob.RegisterName to make sure it has the consistent name registered + // for both gob decoder and encoder. + // + // For a non-pointer type, gob.Register will try to get package full path by + // calling rt.PkgPath() for a name to register. If your project has vendor + // directory, it is possible that PkgPath will get a path like this: + // ../../../vendor/github.com/golang/mock/mockgen/model + gob.RegisterName(pkgPath+".PredeclaredType", PredeclaredType("")) +} + +// ArrayType is an array or slice type. +type ArrayType struct { + Len int // -1 for slices, >= 0 for arrays + Type Type +} + +func (at *ArrayType) String(pm map[string]string, pkgOverride string) string { + s := "[]" + if at.Len > -1 { + s = fmt.Sprintf("[%d]", at.Len) + } + return s + at.Type.String(pm, pkgOverride) +} + +func (at *ArrayType) addImports(im map[string]bool) { at.Type.addImports(im) } + +// ChanType is a channel type. +type ChanType struct { + Dir ChanDir // 0, 1 or 2 + Type Type +} + +func (ct *ChanType) String(pm map[string]string, pkgOverride string) string { + s := ct.Type.String(pm, pkgOverride) + if ct.Dir == RecvDir { + return "<-chan " + s + } + if ct.Dir == SendDir { + return "chan<- " + s + } + return "chan " + s +} + +func (ct *ChanType) addImports(im map[string]bool) { ct.Type.addImports(im) } + +// ChanDir is a channel direction. +type ChanDir int + +// Constants for channel directions. +const ( + RecvDir ChanDir = 1 + SendDir ChanDir = 2 +) + +// FuncType is a function type. +type FuncType struct { + In, Out []*Parameter + Variadic *Parameter // may be nil +} + +func (ft *FuncType) String(pm map[string]string, pkgOverride string) string { + args := make([]string, len(ft.In)) + for i, p := range ft.In { + args[i] = p.Type.String(pm, pkgOverride) + } + if ft.Variadic != nil { + args = append(args, "..."+ft.Variadic.Type.String(pm, pkgOverride)) + } + rets := make([]string, len(ft.Out)) + for i, p := range ft.Out { + rets[i] = p.Type.String(pm, pkgOverride) + } + retString := strings.Join(rets, ", ") + if nOut := len(ft.Out); nOut == 1 { + retString = " " + retString + } else if nOut > 1 { + retString = " (" + retString + ")" + } + return "func(" + strings.Join(args, ", ") + ")" + retString +} + +func (ft *FuncType) addImports(im map[string]bool) { + for _, p := range ft.In { + p.Type.addImports(im) + } + if ft.Variadic != nil { + ft.Variadic.Type.addImports(im) + } + for _, p := range ft.Out { + p.Type.addImports(im) + } +} + +// MapType is a map type. +type MapType struct { + Key, Value Type +} + +func (mt *MapType) String(pm map[string]string, pkgOverride string) string { + return "map[" + mt.Key.String(pm, pkgOverride) + "]" + mt.Value.String(pm, pkgOverride) +} + +func (mt *MapType) addImports(im map[string]bool) { + mt.Key.addImports(im) + mt.Value.addImports(im) +} + +// NamedType is an exported type in a package. +type NamedType struct { + Package string // may be empty + Type string +} + +func (nt *NamedType) String(pm map[string]string, pkgOverride string) string { + if pkgOverride == nt.Package { + return nt.Type + } + prefix := pm[nt.Package] + if prefix != "" { + return prefix + "." + nt.Type + } + + return nt.Type +} + +func (nt *NamedType) addImports(im map[string]bool) { + if nt.Package != "" { + im[nt.Package] = true + } +} + +// PointerType is a pointer to another type. +type PointerType struct { + Type Type +} + +func (pt *PointerType) String(pm map[string]string, pkgOverride string) string { + return "*" + pt.Type.String(pm, pkgOverride) +} +func (pt *PointerType) addImports(im map[string]bool) { pt.Type.addImports(im) } + +// PredeclaredType is a predeclared type such as "int". +type PredeclaredType string + +func (pt PredeclaredType) String(map[string]string, string) string { return string(pt) } +func (pt PredeclaredType) addImports(map[string]bool) {} + +// The following code is intended to be called by the program generated by ../reflect.go. + +// InterfaceFromInterfaceType returns a pointer to an interface for the +// given reflection interface type. +func InterfaceFromInterfaceType(it reflect.Type) (*Interface, error) { + if it.Kind() != reflect.Interface { + return nil, fmt.Errorf("%v is not an interface", it) + } + intf := &Interface{} + + for i := 0; i < it.NumMethod(); i++ { + mt := it.Method(i) + // TODO: need to skip unexported methods? or just raise an error? + m := &Method{ + Name: mt.Name, + } + + var err error + m.In, m.Variadic, m.Out, err = funcArgsFromType(mt.Type) + if err != nil { + return nil, err + } + + intf.AddMethod(m) + } + + return intf, nil +} + +// t's Kind must be a reflect.Func. +func funcArgsFromType(t reflect.Type) (in []*Parameter, variadic *Parameter, out []*Parameter, err error) { + nin := t.NumIn() + if t.IsVariadic() { + nin-- + } + var p *Parameter + for i := 0; i < nin; i++ { + p, err = parameterFromType(t.In(i)) + if err != nil { + return + } + in = append(in, p) + } + if t.IsVariadic() { + p, err = parameterFromType(t.In(nin).Elem()) + if err != nil { + return + } + variadic = p + } + for i := 0; i < t.NumOut(); i++ { + p, err = parameterFromType(t.Out(i)) + if err != nil { + return + } + out = append(out, p) + } + return +} + +func parameterFromType(t reflect.Type) (*Parameter, error) { + tt, err := typeFromType(t) + if err != nil { + return nil, err + } + return &Parameter{Type: tt}, nil +} + +var errorType = reflect.TypeOf((*error)(nil)).Elem() + +var byteType = reflect.TypeOf(byte(0)) + +func typeFromType(t reflect.Type) (Type, error) { + // Hack workaround for https://golang.org/issue/3853. + // This explicit check should not be necessary. + if t == byteType { + return PredeclaredType("byte"), nil + } + + if imp := t.PkgPath(); imp != "" { + return &NamedType{ + Package: impPath(imp), + Type: t.Name(), + }, nil + } + + // only unnamed or predeclared types after here + + // Lots of types have element types. Let's do the parsing and error checking for all of them. + var elemType Type + switch t.Kind() { + case reflect.Array, reflect.Chan, reflect.Map, reflect.Ptr, reflect.Slice: + var err error + elemType, err = typeFromType(t.Elem()) + if err != nil { + return nil, err + } + } + + switch t.Kind() { + case reflect.Array: + return &ArrayType{ + Len: t.Len(), + Type: elemType, + }, nil + case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr, + reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128, reflect.String: + return PredeclaredType(t.Kind().String()), nil + case reflect.Chan: + var dir ChanDir + switch t.ChanDir() { + case reflect.RecvDir: + dir = RecvDir + case reflect.SendDir: + dir = SendDir + } + return &ChanType{ + Dir: dir, + Type: elemType, + }, nil + case reflect.Func: + in, variadic, out, err := funcArgsFromType(t) + if err != nil { + return nil, err + } + return &FuncType{ + In: in, + Out: out, + Variadic: variadic, + }, nil + case reflect.Interface: + // Two special interfaces. + if t.NumMethod() == 0 { + return PredeclaredType("interface{}"), nil + } + if t == errorType { + return PredeclaredType("error"), nil + } + case reflect.Map: + kt, err := typeFromType(t.Key()) + if err != nil { + return nil, err + } + return &MapType{ + Key: kt, + Value: elemType, + }, nil + case reflect.Ptr: + return &PointerType{ + Type: elemType, + }, nil + case reflect.Slice: + return &ArrayType{ + Len: -1, + Type: elemType, + }, nil + case reflect.Struct: + if t.NumField() == 0 { + return PredeclaredType("struct{}"), nil + } + } + + // TODO: Struct, UnsafePointer + return nil, fmt.Errorf("can't yet turn %v (%v) into a model.Type", t, t.Kind()) +} + +// impPath sanitizes the package path returned by `PkgPath` method of a reflect Type so that +// it is importable. PkgPath might return a path that includes "vendor". These paths do not +// compile, so we need to remove everything up to and including "/vendor/". +// See https://github.com/golang/go/issues/12019. +func impPath(imp string) string { + if strings.HasPrefix(imp, "vendor/") { + imp = "/" + imp + } + if i := strings.LastIndex(imp, "/vendor/"); i != -1 { + imp = imp[i+len("/vendor/"):] + } + return imp +} + +// ErrorInterface represent built-in error interface. +var ErrorInterface = Interface{ + Name: "error", + Methods: []*Method{ + { + Name: "Error", + Out: []*Parameter{ + { + Name: "", + Type: PredeclaredType("string"), + }, + }, + }, + }, +} diff --git a/cluster-autoscaler/vendor/modules.txt b/cluster-autoscaler/vendor/modules.txt index de947a27d9c0..03831107ca7b 100644 --- a/cluster-autoscaler/vendor/modules.txt +++ b/cluster-autoscaler/vendor/modules.txt @@ -239,6 +239,7 @@ github.com/golang/groupcache/lru # github.com/golang/mock v1.6.0 ## explicit github.com/golang/mock/gomock +github.com/golang/mock/mockgen/model # github.com/golang/protobuf v1.5.2 github.com/golang/protobuf/descriptor github.com/golang/protobuf/jsonpb @@ -741,6 +742,7 @@ google.golang.org/genproto/googleapis/api/httpbody google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/protobuf/field_mask # google.golang.org/grpc v1.40.0 +## explicit google.golang.org/grpc google.golang.org/grpc/attributes google.golang.org/grpc/backoff @@ -789,6 +791,7 @@ google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap # google.golang.org/protobuf v1.27.1 +## explicit google.golang.org/protobuf/encoding/protojson google.golang.org/protobuf/encoding/prototext google.golang.org/protobuf/encoding/protowire diff --git a/hack/verify-golint.sh b/hack/verify-golint.sh index 40f0a324bece..8ac285d8ab60 100755 --- a/hack/verify-golint.sh +++ b/hack/verify-golint.sh @@ -36,6 +36,7 @@ excluded_packages=( 'cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3' 'cluster-autoscaler/cloudprovider/ionoscloud/ionos-cloud-sdk-go' 'cluster-autoscaler/cloudprovider/hetzner/hcloud-go' + 'cluster-autoscaler/expander/grpcplugin/protos' ) FIND_PACKAGES='go list ./... ' diff --git a/vertical-pod-autoscaler/pkg/recommender/README.md b/vertical-pod-autoscaler/pkg/recommender/README.md index c02c46e4177c..520fbb32fc2a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/README.md +++ b/vertical-pod-autoscaler/pkg/recommender/README.md @@ -2,7 +2,7 @@ - [Intro](#intro) - [Running](#running) -- [Implementation](#implmentation) +- [Implementation](#implementation) ## Intro Recommender is the core binary of Vertical Pod Autoscaler system.