From a346a77d054b610eed0f156e1cf509a3a8d6effa Mon Sep 17 00:00:00 2001 From: Prateek Gogia Date: Fri, 5 Feb 2021 11:43:06 -0600 Subject: [PATCH 1/4] refactor cloud provider, add packing package --- pkg/cloudprovider/aws/fleet/capacity.go | 134 +++--------------- pkg/cloudprovider/aws/fleet/factory.go | 55 +++---- pkg/cloudprovider/aws/fleet/instance.go | 38 ++--- pkg/cloudprovider/aws/fleet/nodefactory.go | 8 +- pkg/cloudprovider/aws/fleet/vpc.go | 129 +++++++++++++++++ pkg/cloudprovider/aws/packings/binpacking.go | 40 ++++++ pkg/cloudprovider/aws/packings/factory.go | 36 +++++ pkg/cloudprovider/fake/capacity.go | 2 +- pkg/cloudprovider/types.go | 24 +++- .../v1alpha1/allocation/greedyallocator.go | 4 +- 10 files changed, 297 insertions(+), 173 deletions(-) create mode 100644 pkg/cloudprovider/aws/fleet/vpc.go create mode 100644 pkg/cloudprovider/aws/packings/binpacking.go create mode 100644 pkg/cloudprovider/aws/packings/factory.go diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index 59ea6d1c21e1..6270294782c1 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -18,45 +18,38 @@ import ( "context" "fmt" - "github.com/aws/aws-sdk-go/service/ec2" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" ) // Capacity cloud provider implementation using AWS Fleet. type Capacity struct { - spec *v1alpha1.ProvisionerSpec - launchTemplateProvider *LaunchTemplateProvider - subnetProvider *SubnetProvider - nodeFactory *NodeFactory - instanceProvider *InstanceProvider + spec *v1alpha1.ProvisionerSpec + nodeFactory *NodeFactory + packing cloudprovider.Packer + instanceProvider *InstanceProvider + vpc *VPCProvider } // Create a set of nodes given the constraints. -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) { - // 1. Compute Constraints - launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.spec.Cluster) - if err != nil { - return nil, fmt.Errorf("getting launch template, %w", err) - } - instancePackings, err := c.instanceProvider.GetPackings(ctx, constraints.Pods, constraints.Overhead) +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { + // 1. Compute Packing given the constraints + instancePackings, err := c.packing.Get(ctx, constraints) if err != nil { return nil, fmt.Errorf("computing bin packing, %w", err) } - zonalSubnets, err := c.getConstrainedZonalSubnets(ctx, constraints) - if err != nil { - return nil, fmt.Errorf("getting zonal subnets, %w", err) - } // 2. Create Instances var instanceIds []*string - for _, instancePacking := range instancePackings { - instanceId, err := c.instanceProvider.Create(ctx, launchTemplate, instancePacking.InstanceTypeOptions, zonalSubnets) + for tempID, instancePacking := range instancePackings { + ec2InstanceID, err := c.instanceProvider.Create(ctx, instancePacking.InstanceTypeOptions, constraints, c.spec) if err != nil { // TODO Aggregate errors and continue return nil, fmt.Errorf("creating capacity %w", err) } - instanceIds = append(instanceIds, instanceId) + instancePackings[*ec2InstanceID] = instancePacking + delete(instancePackings, tempID) + instanceIds = append(instanceIds, ec2InstanceID) } // 3. Convert to Nodes @@ -64,107 +57,14 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci if err != nil { return nil, fmt.Errorf("determining nodes, %w", err) } - - // 4. Construct capacity packing - capacityPacking := cloudprovider.CapacityPacking{} - for i, node := range nodes { - capacityPacking[node] = instancePackings[i].Pods + for instanceID, node := range nodes { + instancePackings[instanceID].Node = node } - return capacityPacking, nil + return instancePackings, nil } // GetTopologyDomains returns a set of supported domains. // e.g. us-west-2 -> [ us-west-2a, us-west-2b ] func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey) ([]string, error) { - switch key { - case cloudprovider.TopologyKeyZone: - zones, err := c.getZones(ctx) - if err != nil { - return nil, err - } - return zones, nil - case cloudprovider.TopologyKeySubnet: - subnets, err := c.getSubnetIds(ctx) - if err != nil { - return nil, err - } - return subnets, nil - default: - return nil, fmt.Errorf("unrecognized topology key %s", key) - } -} - -func (c *Capacity) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (map[string][]*ec2.Subnet, error) { - // 1. Get all subnets - zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name) - if err != nil { - return nil, fmt.Errorf("getting zonal subnets, %w", err) - } - // 2. Return specific subnet if specified. - if subnetId, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok { - for zone, subnets := range zonalSubnets { - for _, subnet := range subnets { - if subnetId == *subnet.SubnetId { - return map[string][]*ec2.Subnet{zone: {subnet}}, nil - } - } - } - return nil, fmt.Errorf("no subnet exists named %s", subnetId) - } - // 3. Constrain by zones - constrainedZones, err := c.getConstrainedZones(ctx, constraints) - if err != nil { - return nil, fmt.Errorf("getting zones, %w", err) - } - constrainedZonalSubnets := map[string][]*ec2.Subnet{} - for zone, subnets := range zonalSubnets { - for _, constrainedZone := range constrainedZones { - if zone == constrainedZone { - constrainedZonalSubnets[constrainedZone] = subnets - } - } - } - if len(constrainedZonalSubnets) == 0 { - return nil, fmt.Errorf("failed to find viable zonal subnet pairing") - } - return constrainedZonalSubnets, nil -} - -func (c *Capacity) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]string, error) { - // 1. Return zone if specified. - if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { - return []string{zone}, nil - } - // 2. Return all zone options - zones, err := c.getZones(ctx) - if err != nil { - return nil, err - } - return zones, nil -} - -func (c *Capacity) getZones(ctx context.Context) ([]string, error) { - zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name) - if err != nil { - return nil, err - } - zones := []string{} - for zone := range zonalSubnets { - zones = append(zones, zone) - } - return zones, nil -} - -func (c *Capacity) getSubnetIds(ctx context.Context) ([]string, error) { - zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name) - if err != nil { - return nil, err - } - subnetIds := []string{} - for _, subnets := range zonalSubnets { - for _, subnet := range subnets { - subnetIds = append(subnetIds, *subnet.SubnetId) - } - } - return subnetIds, nil + return c.vpc.GetTopologyDomains(ctx, key, c.spec.Cluster.Name) } diff --git a/pkg/cloudprovider/aws/fleet/factory.go b/pkg/cloudprovider/aws/fleet/factory.go index 8630d7a1be9d..9676fdb24dfd 100644 --- a/pkg/cloudprovider/aws/fleet/factory.go +++ b/pkg/cloudprovider/aws/fleet/factory.go @@ -20,6 +20,8 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/iam/iamiface" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" + "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/cloudprovider/aws/packings" "github.com/patrickmn/go-cache" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -36,44 +38,47 @@ const ( ) func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Client) *Factory { - return &Factory{ - ec2: ec2, - launchTemplateProvider: &LaunchTemplateProvider{ - ec2: ec2, - launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), - instanceProfileProvider: &InstanceProfileProvider{ - iam: iam, - kubeClient: kubeClient, - instanceProfileCache: cache.New(CacheTTL, CacheCleanupInterval), - }, - securityGroupProvider: &SecurityGroupProvider{ - ec2: ec2, - securityGroupCache: cache.New(CacheTTL, CacheCleanupInterval), - }, + vpcProvider := &VPCProvider{launchTemplateProvider: &LaunchTemplateProvider{ + ec2: ec2, + launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), + instanceProfileProvider: &InstanceProfileProvider{ + iam: iam, + kubeClient: kubeClient, + instanceProfileCache: cache.New(CacheTTL, CacheCleanupInterval), + }, + securityGroupProvider: &SecurityGroupProvider{ + ec2: ec2, + securityGroupCache: cache.New(CacheTTL, CacheCleanupInterval), }, + }, subnetProvider: &SubnetProvider{ ec2: ec2, subnetCache: cache.New(CacheTTL, CacheCleanupInterval), }, + } + return &Factory{ + ec2: ec2, + vpc: vpcProvider, nodeFactory: &NodeFactory{ec2: ec2}, - instanceProvider: &InstanceProvider{ec2: ec2}, + instanceProvider: &InstanceProvider{ec2: ec2, vpc: vpcProvider}, + packing: packings.Factory(ec2, packings.BinPacking), } } type Factory struct { - ec2 ec2iface.EC2API - launchTemplateProvider *LaunchTemplateProvider - nodeFactory *NodeFactory - subnetProvider *SubnetProvider - instanceProvider *InstanceProvider + ec2 ec2iface.EC2API + vpc *VPCProvider + nodeFactory *NodeFactory + instanceProvider *InstanceProvider + packing cloudprovider.Packer } func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity { return &Capacity{ - spec: spec, - launchTemplateProvider: f.launchTemplateProvider, - nodeFactory: f.nodeFactory, - subnetProvider: f.subnetProvider, - instanceProvider: f.instanceProvider, + spec: spec, + nodeFactory: f.nodeFactory, + instanceProvider: f.instanceProvider, + vpc: f.vpc, + packing: f.packing, } } diff --git a/pkg/cloudprovider/aws/fleet/instance.go b/pkg/cloudprovider/aws/fleet/instance.go index 286668e8d81a..cb2bffec5a7c 100644 --- a/pkg/cloudprovider/aws/fleet/instance.go +++ b/pkg/cloudprovider/aws/fleet/instance.go @@ -22,38 +22,30 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "go.uber.org/zap" - v1 "k8s.io/api/core/v1" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" + "github.com/awslabs/karpenter/pkg/cloudprovider" ) type InstanceProvider struct { ec2 ec2iface.EC2API -} - -// Packing is a binpacking solution of pods and their viable instances types. -type Packing struct { - // Pods is a set of pods that is assigned to this packing. - Pods []*v1.Pod - // InstanceTypes is a set of instance types that is able to fit the pods. - InstanceTypeOptions []string -} - -// GetPackings for the provided pods. Computes an ordered set of viable instance -// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions. -func (p *InstanceProvider) GetPackings(ctx context.Context, pods []*v1.Pod, overhead v1.ResourceList) ([]*Packing, error) { - zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1) - return []*Packing{{ - InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types - Pods: pods, - }}, nil + vpc *VPCProvider } // Create an instance given the constraints. func (p *InstanceProvider) Create(ctx context.Context, - launchTemplate *ec2.LaunchTemplate, instanceTypeOptions []string, - zonalSubnetOptions map[string][]*ec2.Subnet, -) (*string, error) { + constraints *cloudprovider.CapacityConstraints, + spec *v1alpha1.ProvisionerSpec) (*string, error) { + + launchTemplate, err := p.vpc.getLaunchTemplate(ctx, spec.Cluster) + if err != nil { + return nil, fmt.Errorf("getting launch template, %w", err) + } + zonalSubnetOptions, err := p.vpc.getConstrainedZonalSubnets(ctx, constraints, spec.Cluster.Name) + if err != nil { + return nil, fmt.Errorf("getting zonal subnets, %w", err) + } + // 1. Construct override options. var overrides []*ec2.FleetLaunchTemplateOverridesRequest for _, instanceType := range instanceTypeOptions { diff --git a/pkg/cloudprovider/aws/fleet/nodefactory.go b/pkg/cloudprovider/aws/fleet/nodefactory.go index 32671c2f5259..b22adbca3b70 100644 --- a/pkg/cloudprovider/aws/fleet/nodefactory.go +++ b/pkg/cloudprovider/aws/fleet/nodefactory.go @@ -33,7 +33,7 @@ type NodeFactory struct { ec2 ec2iface.EC2API } -func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) ([]*v1.Node, error) { +func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) (map[string]*v1.Node, error) { // Backoff retry is necessary here because EC2's APIs are eventually // consistent. In most cases, this call will only be made once. for attempt := retry.Start(retry.Exponential{ @@ -53,11 +53,11 @@ func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) ([]*v1.Nod return nil, fmt.Errorf("failed to describe ec2 instances") } -func (n *NodeFactory) nodesFrom(reservations []*ec2.Reservation) []*v1.Node { - var nodes []*v1.Node +func (n *NodeFactory) nodesFrom(reservations []*ec2.Reservation) map[string]*v1.Node { + var nodes map[string]*v1.Node for _, reservation := range reservations { for _, instance := range reservation.Instances { - nodes = append(nodes, n.nodeFrom(instance)) + nodes[*instance.InstanceId] = n.nodeFrom(instance) } } return nodes diff --git a/pkg/cloudprovider/aws/fleet/vpc.go b/pkg/cloudprovider/aws/fleet/vpc.go new file mode 100644 index 000000000000..aa38f70b3498 --- /dev/null +++ b/pkg/cloudprovider/aws/fleet/vpc.go @@ -0,0 +1,129 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fleet + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" + "github.com/awslabs/karpenter/pkg/cloudprovider" +) + +type VPCProvider struct { + launchTemplateProvider *LaunchTemplateProvider + subnetProvider *SubnetProvider +} + +// GetTopologyDomains returns a set of supported domains. +// e.g. us-west-2 -> [ us-west-2a, us-west-2b ] +func (p *VPCProvider) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey, clusterName string) ([]string, error) { + switch key { + case cloudprovider.TopologyKeyZone: + zones, err := p.getZones(ctx, clusterName) + if err != nil { + return nil, err + } + return zones, nil + case cloudprovider.TopologyKeySubnet: + subnets, err := p.getSubnetIds(ctx, clusterName) + if err != nil { + return nil, err + } + return subnets, nil + default: + return nil, fmt.Errorf("unrecognized topology key %s", key) + } +} + +func (p *VPCProvider) getLaunchTemplate(ctx context.Context, clusterSpec *v1alpha1.ClusterSpec) (*ec2.LaunchTemplate, error) { + return p.launchTemplateProvider.Get(ctx, clusterSpec) +} + +func (p *VPCProvider) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints, clusterName string) (map[string][]*ec2.Subnet, error) { + // 1. Get all subnets + zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) + if err != nil { + return nil, fmt.Errorf("getting zonal subnets, %w", err) + } + // 2. Return specific subnet if specified. + if subnetID, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok { + for zone, subnets := range zonalSubnets { + for _, subnet := range subnets { + if subnetID == *subnet.SubnetId { + return map[string][]*ec2.Subnet{zone: {subnet}}, nil + } + } + } + return nil, fmt.Errorf("no subnet exists named %s", subnetID) + } + // 3. Constrain by zones + constrainedZones, err := p.getConstrainedZones(ctx, constraints, clusterName) + if err != nil { + return nil, fmt.Errorf("getting zones, %w", err) + } + constrainedZonalSubnets := map[string][]*ec2.Subnet{} + for zone, subnets := range zonalSubnets { + for _, constrainedZone := range constrainedZones { + if zone == constrainedZone { + constrainedZonalSubnets[constrainedZone] = subnets + } + } + } + if len(constrainedZonalSubnets) == 0 { + return nil, fmt.Errorf("failed to find viable zonal subnet pairing") + } + return constrainedZonalSubnets, nil +} + +func (p *VPCProvider) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints, clusterName string) ([]string, error) { + // 1. Return zone if specified. + if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { + return []string{zone}, nil + } + // 2. Return all zone options + zones, err := p.getZones(ctx, clusterName) + if err != nil { + return nil, err + } + return zones, nil +} + +func (p *VPCProvider) getZones(ctx context.Context, clusterName string) ([]string, error) { + zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) + if err != nil { + return nil, err + } + zones := []string{} + for zone := range zonalSubnets { + zones = append(zones, zone) + } + return zones, nil +} + +func (p *VPCProvider) getSubnetIds(ctx context.Context, clusterName string) ([]string, error) { + zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) + if err != nil { + return nil, err + } + subnetIds := []string{} + for _, subnets := range zonalSubnets { + for _, subnet := range subnets { + subnetIds = append(subnetIds, *subnet.SubnetId) + } + } + return subnetIds, nil +} diff --git a/pkg/cloudprovider/aws/packings/binpacking.go b/pkg/cloudprovider/aws/packings/binpacking.go new file mode 100644 index 000000000000..24925bd87f04 --- /dev/null +++ b/pkg/cloudprovider/aws/packings/binpacking.go @@ -0,0 +1,40 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package packings + +import ( + "context" + + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/awslabs/karpenter/pkg/cloudprovider" + "go.uber.org/zap" +) + +type binPacker struct { + ec2 ec2iface.EC2API +} + +// Get returns the packings for the provided pods. Computes an ordered set of viable instance +// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions. +func (p *binPacker) Get(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { + zap.S().Infof("Successfully packed %d pods onto %d nodes", len(constraints.Pods), 1) + nodeID := "1" + return cloudprovider.Packings{ + nodeID: &cloudprovider.NodePacking{ + InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types + Pods: constraints.Pods, + }, + }, nil +} diff --git a/pkg/cloudprovider/aws/packings/factory.go b/pkg/cloudprovider/aws/packings/factory.go new file mode 100644 index 000000000000..b69d766f9e9e --- /dev/null +++ b/pkg/cloudprovider/aws/packings/factory.go @@ -0,0 +1,36 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package packings + +import ( + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/awslabs/karpenter/pkg/cloudprovider" +) + +type PackingMethod string + +const ( + BinPacking PackingMethod = "binPacking" +) + +// Factory returns a Packer to calculate the pod packing based of PackingMethod passed. +func Factory(ec2 ec2iface.EC2API, method PackingMethod) cloudprovider.Packer { + switch method { + case BinPacking: + return &binPacker{ec2: ec2} + } + //TODO add more methods + return &binPacker{ec2: ec2} +} diff --git a/pkg/cloudprovider/fake/capacity.go b/pkg/cloudprovider/fake/capacity.go index 60bb04c50e5e..cc5b3f1a8002 100644 --- a/pkg/cloudprovider/fake/capacity.go +++ b/pkg/cloudprovider/fake/capacity.go @@ -23,7 +23,7 @@ import ( type Capacity struct { } -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) { +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { return nil, nil } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 1dcb61fe8bdc..acc42a864106 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -58,7 +58,7 @@ type NodeGroup interface { // Capacity provisions a set of nodes that fulfill a set of constraints. type Capacity interface { // Create a set of nodes to fulfill the desired capacity given constraints. - Create(context.Context, *CapacityConstraints) (CapacityPacking, error) + Create(context.Context, *CapacityConstraints) (Packings, error) // GetTopologyDomains returns a list of topology domains supported by the // cloud provider for the given key. @@ -85,6 +85,28 @@ type CapacityConstraints struct { // CapacityPacking is a solution to packing pods onto nodes given constraints. type CapacityPacking map[*v1.Node][]*v1.Pod +// Packings contains the result of packing decision made by a CapacityPacking, an +// instance ID as key and value is the node object and the pods packed in the instance +type Packings map[string]*NodePacking + +type NodePacking struct { + Node *v1.Node + Pods []*v1.Pod + InstanceTypeOptions []string +} + +// TODO split NodePacking +// type PackingDecision struct { +// Pods []*v1.Pod +// InstanceTypeOptions []string +// } + +// Packer is a method that takes contraints and calculates the pods that can be +// efficiently placed on the instances. +type Packer interface { + Get(ctx context.Context, constraints *CapacityConstraints) (Packings, error) +} + // TopologyKey: // https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/ type TopologyKey string diff --git a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go index db29f03d8f76..f75253900259 100644 --- a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go +++ b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go @@ -49,8 +49,8 @@ func (a *GreedyAllocator) Allocate(provisioner *v1alpha1.Provisioner, pods []*v1 if err != nil { return fmt.Errorf("while creating capacity, %w", err) } - for node, pods := range packing { - if err := a.bind(ctx, node, pods); err != nil { + for _, pack := range packing { + if err := a.bind(ctx, pack.Node, pack.Pods); err != nil { // TODO accumulate errors if one request fails. return fmt.Errorf("binding pods to node, %w", err) } From 5845089d08580892174b7e9382a5d8c4914ecd61 Mon Sep 17 00:00:00 2001 From: Prateek Gogia Date: Sat, 6 Feb 2021 18:43:30 -0600 Subject: [PATCH 2/4] renaming and cleaning up stuff --- go.mod | 1 - pkg/cloudprovider/aws/fleet/capacity.go | 28 +++++-- pkg/cloudprovider/aws/fleet/factory.go | 29 +++---- pkg/cloudprovider/aws/fleet/instance.go | 16 +--- .../packing/packing.go} | 20 +++-- pkg/cloudprovider/aws/fleet/subnet.go | 72 ---------------- pkg/cloudprovider/aws/fleet/vpc.go | 84 +++++++++++++++---- pkg/cloudprovider/aws/packings/factory.go | 36 -------- pkg/cloudprovider/fake/capacity.go | 2 +- pkg/cloudprovider/types.go | 27 ++---- .../v1alpha1/allocation/greedyallocator.go | 16 ++-- 11 files changed, 134 insertions(+), 197 deletions(-) rename pkg/cloudprovider/aws/{packings/binpacking.go => fleet/packing/packing.go} (77%) delete mode 100644 pkg/cloudprovider/aws/fleet/subnet.go delete mode 100644 pkg/cloudprovider/aws/packings/factory.go diff --git a/go.mod b/go.mod index e7e02f31827b..7a781190736c 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/onsi/ginkgo v1.14.2 github.com/onsi/gomega v1.10.3 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 github.com/prometheus/common v0.14.0 github.com/robfig/cron/v3 v3.0.0 diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index 6270294782c1..5e331e392cc7 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -20,6 +20,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" + v1 "k8s.io/api/core/v1" ) // Capacity cloud provider implementation using AWS Fleet. @@ -32,23 +33,33 @@ type Capacity struct { } // Create a set of nodes given the constraints. -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.PodPackings, error) { // 1. Compute Packing given the constraints - instancePackings, err := c.packing.Get(ctx, constraints) + packings, err := c.packing.Pack(ctx, constraints.Pods) if err != nil { return nil, fmt.Errorf("computing bin packing, %w", err) } + launchTemplate, err := c.vpc.GetLaunchTemplate(ctx, c.spec.Cluster) + if err != nil { + return nil, fmt.Errorf("getting launch template, %w", err) + } + + zonalSubnetOptions, err := c.vpc.GetZonalSubnets(ctx, constraints, c.spec.Cluster.Name) + if err != nil { + return nil, fmt.Errorf("getting zonal subnets, %w", err) + } + // 2. Create Instances var instanceIds []*string - for tempID, instancePacking := range instancePackings { - ec2InstanceID, err := c.instanceProvider.Create(ctx, instancePacking.InstanceTypeOptions, constraints, c.spec) + podsMapped := make(map[string][]*v1.Pod) + for _, packing := range packings { + ec2InstanceID, err := c.instanceProvider.Create(ctx, packing.InstanceTypeOptions, launchTemplate, zonalSubnetOptions) if err != nil { // TODO Aggregate errors and continue return nil, fmt.Errorf("creating capacity %w", err) } - instancePackings[*ec2InstanceID] = instancePacking - delete(instancePackings, tempID) + podsMapped[*ec2InstanceID] = packing.Pods instanceIds = append(instanceIds, ec2InstanceID) } @@ -57,10 +68,11 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci if err != nil { return nil, fmt.Errorf("determining nodes, %w", err) } + nodePackings := make(cloudprovider.PodPackings) for instanceID, node := range nodes { - instancePackings[instanceID].Node = node + nodePackings[node] = podsMapped[instanceID] } - return instancePackings, nil + return nodePackings, nil } // GetTopologyDomains returns a set of supported domains. diff --git a/pkg/cloudprovider/aws/fleet/factory.go b/pkg/cloudprovider/aws/fleet/factory.go index 9676fdb24dfd..3793b6de0626 100644 --- a/pkg/cloudprovider/aws/fleet/factory.go +++ b/pkg/cloudprovider/aws/fleet/factory.go @@ -21,7 +21,7 @@ import ( "github.com/aws/aws-sdk-go/service/iam/iamiface" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/cloudprovider/aws/packings" + "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/packing" "github.com/patrickmn/go-cache" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -38,19 +38,20 @@ const ( ) func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Client) *Factory { - vpcProvider := &VPCProvider{launchTemplateProvider: &LaunchTemplateProvider{ - ec2: ec2, - launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), - instanceProfileProvider: &InstanceProfileProvider{ - iam: iam, - kubeClient: kubeClient, - instanceProfileCache: cache.New(CacheTTL, CacheCleanupInterval), + vpcProvider := &VPCProvider{ + launchTemplateProvider: &LaunchTemplateProvider{ + ec2: ec2, + launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), + instanceProfileProvider: &InstanceProfileProvider{ + iam: iam, + kubeClient: kubeClient, + instanceProfileCache: cache.New(CacheTTL, CacheCleanupInterval), + }, + securityGroupProvider: &SecurityGroupProvider{ + ec2: ec2, + securityGroupCache: cache.New(CacheTTL, CacheCleanupInterval), + }, }, - securityGroupProvider: &SecurityGroupProvider{ - ec2: ec2, - securityGroupCache: cache.New(CacheTTL, CacheCleanupInterval), - }, - }, subnetProvider: &SubnetProvider{ ec2: ec2, subnetCache: cache.New(CacheTTL, CacheCleanupInterval), @@ -61,7 +62,7 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie vpc: vpcProvider, nodeFactory: &NodeFactory{ec2: ec2}, instanceProvider: &InstanceProvider{ec2: ec2, vpc: vpcProvider}, - packing: packings.Factory(ec2, packings.BinPacking), + packing: packing.NewPacker(ec2), } } diff --git a/pkg/cloudprovider/aws/fleet/instance.go b/pkg/cloudprovider/aws/fleet/instance.go index cb2bffec5a7c..10ba967f4212 100644 --- a/pkg/cloudprovider/aws/fleet/instance.go +++ b/pkg/cloudprovider/aws/fleet/instance.go @@ -22,8 +22,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" - "github.com/awslabs/karpenter/pkg/cloudprovider" ) type InstanceProvider struct { @@ -34,17 +32,9 @@ type InstanceProvider struct { // Create an instance given the constraints. func (p *InstanceProvider) Create(ctx context.Context, instanceTypeOptions []string, - constraints *cloudprovider.CapacityConstraints, - spec *v1alpha1.ProvisionerSpec) (*string, error) { - - launchTemplate, err := p.vpc.getLaunchTemplate(ctx, spec.Cluster) - if err != nil { - return nil, fmt.Errorf("getting launch template, %w", err) - } - zonalSubnetOptions, err := p.vpc.getConstrainedZonalSubnets(ctx, constraints, spec.Cluster.Name) - if err != nil { - return nil, fmt.Errorf("getting zonal subnets, %w", err) - } + launchTemplate *ec2.LaunchTemplate, + zonalSubnetOptions map[string][]*ec2.Subnet, +) (*string, error) { // 1. Construct override options. var overrides []*ec2.FleetLaunchTemplateOverridesRequest diff --git a/pkg/cloudprovider/aws/packings/binpacking.go b/pkg/cloudprovider/aws/fleet/packing/packing.go similarity index 77% rename from pkg/cloudprovider/aws/packings/binpacking.go rename to pkg/cloudprovider/aws/fleet/packing/packing.go index 24925bd87f04..6229910c8e83 100644 --- a/pkg/cloudprovider/aws/packings/binpacking.go +++ b/pkg/cloudprovider/aws/fleet/packing/packing.go @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package packings +package packing import ( "context" @@ -20,21 +20,25 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/cloudprovider" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" ) -type binPacker struct { +type Packer struct { ec2 ec2iface.EC2API } +func NewPacker(ec2 ec2iface.EC2API) *Packer { + return &Packer{ec2: ec2} +} + // Get returns the packings for the provided pods. Computes an ordered set of viable instance // types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions. -func (p *binPacker) Get(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { - zap.S().Infof("Successfully packed %d pods onto %d nodes", len(constraints.Pods), 1) - nodeID := "1" - return cloudprovider.Packings{ - nodeID: &cloudprovider.NodePacking{ +func (p *Packer) Pack(ctx context.Context, pods []*v1.Pod) ([]*cloudprovider.Packings, error) { + zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1) + return []*cloudprovider.Packings{ + { InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types - Pods: constraints.Pods, + Pods: pods, }, }, nil } diff --git a/pkg/cloudprovider/aws/fleet/subnet.go b/pkg/cloudprovider/aws/fleet/subnet.go deleted file mode 100644 index 182711b61624..000000000000 --- a/pkg/cloudprovider/aws/fleet/subnet.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package fleet - -import ( - "context" - "fmt" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" - "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "github.com/patrickmn/go-cache" - "go.uber.org/zap" -) - -type ZonalSubnets map[string][]*ec2.Subnet - -type SubnetProvider struct { - ec2 ec2iface.EC2API - subnetCache *cache.Cache -} - -func NewSubnetProvider(ec2 ec2iface.EC2API) *SubnetProvider { - return &SubnetProvider{ - ec2: ec2, - subnetCache: cache.New(CacheTTL, CacheCleanupInterval), - } -} - -func (s *SubnetProvider) Get(ctx context.Context, clusterName string) (ZonalSubnets, error) { - if zonalSubnets, ok := s.subnetCache.Get(clusterName); ok { - return zonalSubnets.(ZonalSubnets), nil - } - return s.getZonalSubnets(ctx, clusterName) -} - -func (s *SubnetProvider) getZonalSubnets(ctx context.Context, clusterName string) (ZonalSubnets, error) { - describeSubnetOutput, err := s.ec2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{ - Filters: []*ec2.Filter{{ - Name: aws.String("tag-key"), - Values: []*string{aws.String(fmt.Sprintf(ClusterTagKeyFormat, clusterName))}, - }}, - }) - if err != nil { - return nil, fmt.Errorf("describing subnets, %w", err) - } - - zonalSubnetMap := ZonalSubnets{} - for _, subnet := range describeSubnetOutput.Subnets { - if subnets, ok := zonalSubnetMap[*subnet.AvailabilityZone]; ok { - zonalSubnetMap[*subnet.AvailabilityZone] = append(subnets, subnet) - } else { - zonalSubnetMap[*subnet.AvailabilityZone] = []*ec2.Subnet{subnet} - } - } - - s.subnetCache.Set(clusterName, zonalSubnetMap, CacheTTL) - zap.S().Infof("Successfully discovered subnets in %d zones for cluster %s", len(zonalSubnetMap), clusterName) - return zonalSubnetMap, nil -} diff --git a/pkg/cloudprovider/aws/fleet/vpc.go b/pkg/cloudprovider/aws/fleet/vpc.go index aa38f70b3498..475dae0f79f4 100644 --- a/pkg/cloudprovider/aws/fleet/vpc.go +++ b/pkg/cloudprovider/aws/fleet/vpc.go @@ -18,9 +18,13 @@ import ( "context" "fmt" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/patrickmn/go-cache" + "go.uber.org/zap" ) type VPCProvider struct { @@ -33,7 +37,7 @@ type VPCProvider struct { func (p *VPCProvider) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey, clusterName string) ([]string, error) { switch key { case cloudprovider.TopologyKeyZone: - zones, err := p.getZones(ctx, clusterName) + zones, err := p.GetZones(ctx, clusterName) if err != nil { return nil, err } @@ -49,11 +53,23 @@ func (p *VPCProvider) GetTopologyDomains(ctx context.Context, key cloudprovider. } } -func (p *VPCProvider) getLaunchTemplate(ctx context.Context, clusterSpec *v1alpha1.ClusterSpec) (*ec2.LaunchTemplate, error) { +func (p *VPCProvider) GetLaunchTemplate(ctx context.Context, clusterSpec *v1alpha1.ClusterSpec) (*ec2.LaunchTemplate, error) { return p.launchTemplateProvider.Get(ctx, clusterSpec) } -func (p *VPCProvider) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints, clusterName string) (map[string][]*ec2.Subnet, error) { +func (p *VPCProvider) GetZones(ctx context.Context, clusterName string) ([]string, error) { + zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) + if err != nil { + return nil, err + } + zones := []string{} + for zone := range zonalSubnets { + zones = append(zones, zone) + } + return zones, nil +} + +func (p *VPCProvider) GetZonalSubnets(ctx context.Context, constraints *cloudprovider.Constraints, clusterName string) (map[string][]*ec2.Subnet, error) { // 1. Get all subnets zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) if err != nil { @@ -89,31 +105,19 @@ func (p *VPCProvider) getConstrainedZonalSubnets(ctx context.Context, constraint return constrainedZonalSubnets, nil } -func (p *VPCProvider) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints, clusterName string) ([]string, error) { +func (p *VPCProvider) getConstrainedZones(ctx context.Context, constraints *cloudprovider.Constraints, clusterName string) ([]string, error) { // 1. Return zone if specified. if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { return []string{zone}, nil } // 2. Return all zone options - zones, err := p.getZones(ctx, clusterName) + zones, err := p.GetZones(ctx, clusterName) if err != nil { return nil, err } return zones, nil } -func (p *VPCProvider) getZones(ctx context.Context, clusterName string) ([]string, error) { - zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) - if err != nil { - return nil, err - } - zones := []string{} - for zone := range zonalSubnets { - zones = append(zones, zone) - } - return zones, nil -} - func (p *VPCProvider) getSubnetIds(ctx context.Context, clusterName string) ([]string, error) { zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) if err != nil { @@ -127,3 +131,49 @@ func (p *VPCProvider) getSubnetIds(ctx context.Context, clusterName string) ([]s } return subnetIds, nil } + +type ZonalSubnets map[string][]*ec2.Subnet + +type SubnetProvider struct { + ec2 ec2iface.EC2API + subnetCache *cache.Cache +} + +func NewSubnetProvider(ec2 ec2iface.EC2API) *SubnetProvider { + return &SubnetProvider{ + ec2: ec2, + subnetCache: cache.New(CacheTTL, CacheCleanupInterval), + } +} + +func (s *SubnetProvider) Get(ctx context.Context, clusterName string) (ZonalSubnets, error) { + if zonalSubnets, ok := s.subnetCache.Get(clusterName); ok { + return zonalSubnets.(ZonalSubnets), nil + } + return s.getZonalSubnets(ctx, clusterName) +} + +func (s *SubnetProvider) getZonalSubnets(ctx context.Context, clusterName string) (ZonalSubnets, error) { + describeSubnetOutput, err := s.ec2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{ + Filters: []*ec2.Filter{{ + Name: aws.String("tag-key"), + Values: []*string{aws.String(fmt.Sprintf(ClusterTagKeyFormat, clusterName))}, + }}, + }) + if err != nil { + return nil, fmt.Errorf("describing subnets, %w", err) + } + + zonalSubnetMap := ZonalSubnets{} + for _, subnet := range describeSubnetOutput.Subnets { + if subnets, ok := zonalSubnetMap[*subnet.AvailabilityZone]; ok { + zonalSubnetMap[*subnet.AvailabilityZone] = append(subnets, subnet) + } else { + zonalSubnetMap[*subnet.AvailabilityZone] = []*ec2.Subnet{subnet} + } + } + + s.subnetCache.Set(clusterName, zonalSubnetMap, CacheTTL) + zap.S().Infof("Successfully discovered subnets in %d zones for cluster %s", len(zonalSubnetMap), clusterName) + return zonalSubnetMap, nil +} diff --git a/pkg/cloudprovider/aws/packings/factory.go b/pkg/cloudprovider/aws/packings/factory.go deleted file mode 100644 index b69d766f9e9e..000000000000 --- a/pkg/cloudprovider/aws/packings/factory.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package packings - -import ( - "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "github.com/awslabs/karpenter/pkg/cloudprovider" -) - -type PackingMethod string - -const ( - BinPacking PackingMethod = "binPacking" -) - -// Factory returns a Packer to calculate the pod packing based of PackingMethod passed. -func Factory(ec2 ec2iface.EC2API, method PackingMethod) cloudprovider.Packer { - switch method { - case BinPacking: - return &binPacker{ec2: ec2} - } - //TODO add more methods - return &binPacker{ec2: ec2} -} diff --git a/pkg/cloudprovider/fake/capacity.go b/pkg/cloudprovider/fake/capacity.go index cc5b3f1a8002..ceeb3b3d957c 100644 --- a/pkg/cloudprovider/fake/capacity.go +++ b/pkg/cloudprovider/fake/capacity.go @@ -23,7 +23,7 @@ import ( type Capacity struct { } -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.Packings, error) { +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.PodPackings, error) { return nil, nil } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index acc42a864106..af6ab8e3e2f7 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -58,18 +58,18 @@ type NodeGroup interface { // Capacity provisions a set of nodes that fulfill a set of constraints. type Capacity interface { // Create a set of nodes to fulfill the desired capacity given constraints. - Create(context.Context, *CapacityConstraints) (Packings, error) + Create(context.Context, *Constraints) (PodPackings, error) // GetTopologyDomains returns a list of topology domains supported by the // cloud provider for the given key. // For example, GetTopologyDomains("zone") -> [ "us-west-2a", "us-west-2b" ] - // This enables the caller to to build CapacityConstraints for a known set of + // This enables the caller to to build Constraints for a known set of GetTopologyDomains(context.Context, TopologyKey) ([]string, error) } -// CapacityConstraints lets the controller define the desired capacity, +// Constraints lets the controller define the desired capacity, // avalability zone, architecture for the desired nodes. -type CapacityConstraints struct { +type Constraints struct { // Pods is a list of equivalently schedulable pods to be efficiently // binpacked. Pods []*v1.Pod @@ -82,29 +82,18 @@ type CapacityConstraints struct { Architecture Architecture } -// CapacityPacking is a solution to packing pods onto nodes given constraints. -type CapacityPacking map[*v1.Node][]*v1.Pod +// PodPackings is a solution to packing pods onto nodes given constraints. +type PodPackings map[*v1.Node][]*v1.Pod -// Packings contains the result of packing decision made by a CapacityPacking, an -// instance ID as key and value is the node object and the pods packed in the instance -type Packings map[string]*NodePacking - -type NodePacking struct { - Node *v1.Node +type Packings struct { Pods []*v1.Pod InstanceTypeOptions []string } -// TODO split NodePacking -// type PackingDecision struct { -// Pods []*v1.Pod -// InstanceTypeOptions []string -// } - // Packer is a method that takes contraints and calculates the pods that can be // efficiently placed on the instances. type Packer interface { - Get(ctx context.Context, constraints *CapacityConstraints) (Packings, error) + Pack(ctx context.Context, pods []*v1.Pod) ([]*Packings, error) } // TopologyKey: diff --git a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go index f75253900259..a159eb223aa1 100644 --- a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go +++ b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go @@ -44,13 +44,13 @@ func (a *GreedyAllocator) Allocate(provisioner *v1alpha1.Provisioner, pods []*v1 zap.S().Infof("Allocating %d pending pods from %d constraint groups", len(pods), len(groups)) // 2. Group pods into equally schedulable constraint group for _, constraints := range groups { - packing, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(ctx, constraints) + packings, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(ctx, constraints) // TODO accumulate errors if one request fails. if err != nil { return fmt.Errorf("while creating capacity, %w", err) } - for _, pack := range packing { - if err := a.bind(ctx, pack.Node, pack.Pods); err != nil { + for node, pods := range packings { + if err := a.bind(ctx, node, pods); err != nil { // TODO accumulate errors if one request fails. return fmt.Errorf("binding pods to node, %w", err) } @@ -78,8 +78,8 @@ func (a *GreedyAllocator) bind(ctx context.Context, node *v1.Node, pods []*v1.Po return nil } -func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []*cloudprovider.CapacityConstraints { - schedulingGroups := []*cloudprovider.CapacityConstraints{} +func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []*cloudprovider.Constraints { + schedulingGroups := []*cloudprovider.Constraints{} for _, pod := range pods { added := false for _, constraints := range schedulingGroups { @@ -98,12 +98,12 @@ func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []*cloudprovider.C } // TODO -func (a *GreedyAllocator) matchesConstraints(constraints *cloudprovider.CapacityConstraints, pod *v1.Pod) bool { +func (a *GreedyAllocator) matchesConstraints(constraints *cloudprovider.Constraints, pod *v1.Pod) bool { return false } -func constraintsForPod(pod *v1.Pod) *cloudprovider.CapacityConstraints { - return &cloudprovider.CapacityConstraints{ +func constraintsForPod(pod *v1.Pod) *cloudprovider.Constraints { + return &cloudprovider.Constraints{ Overhead: calculateOverheadResources(), Architecture: getSystemArchitecture(pod), Topology: map[cloudprovider.TopologyKey]string{}, From 2d7941ec139a0d011c1982dd46bcc0beb25b1be7 Mon Sep 17 00:00:00 2001 From: Prateek Gogia Date: Sat, 6 Feb 2021 18:50:02 -0600 Subject: [PATCH 3/4] move GetTopologyDomains back to capacity --- pkg/cloudprovider/aws/fleet/capacity.go | 17 ++++++++- pkg/cloudprovider/aws/fleet/vpc.go | 49 +++++++------------------ 2 files changed, 30 insertions(+), 36 deletions(-) diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index 5e331e392cc7..62365eb7fbb3 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -78,5 +78,20 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constr // GetTopologyDomains returns a set of supported domains. // e.g. us-west-2 -> [ us-west-2a, us-west-2b ] func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey) ([]string, error) { - return c.vpc.GetTopologyDomains(ctx, key, c.spec.Cluster.Name) + switch key { + case cloudprovider.TopologyKeyZone: + zones, err := c.vpc.GetZones(ctx, c.spec.Cluster.Name) + if err != nil { + return nil, err + } + return zones, nil + case cloudprovider.TopologyKeySubnet: + subnets, err := c.vpc.GetSubnetIds(ctx, c.spec.Cluster.Name) + if err != nil { + return nil, err + } + return subnets, nil + default: + return nil, fmt.Errorf("unrecognized topology key %s", key) + } } diff --git a/pkg/cloudprovider/aws/fleet/vpc.go b/pkg/cloudprovider/aws/fleet/vpc.go index 475dae0f79f4..4f7d0dc5f256 100644 --- a/pkg/cloudprovider/aws/fleet/vpc.go +++ b/pkg/cloudprovider/aws/fleet/vpc.go @@ -32,27 +32,6 @@ type VPCProvider struct { subnetProvider *SubnetProvider } -// GetTopologyDomains returns a set of supported domains. -// e.g. us-west-2 -> [ us-west-2a, us-west-2b ] -func (p *VPCProvider) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey, clusterName string) ([]string, error) { - switch key { - case cloudprovider.TopologyKeyZone: - zones, err := p.GetZones(ctx, clusterName) - if err != nil { - return nil, err - } - return zones, nil - case cloudprovider.TopologyKeySubnet: - subnets, err := p.getSubnetIds(ctx, clusterName) - if err != nil { - return nil, err - } - return subnets, nil - default: - return nil, fmt.Errorf("unrecognized topology key %s", key) - } -} - func (p *VPCProvider) GetLaunchTemplate(ctx context.Context, clusterSpec *v1alpha1.ClusterSpec) (*ec2.LaunchTemplate, error) { return p.launchTemplateProvider.Get(ctx, clusterSpec) } @@ -105,20 +84,7 @@ func (p *VPCProvider) GetZonalSubnets(ctx context.Context, constraints *cloudpro return constrainedZonalSubnets, nil } -func (p *VPCProvider) getConstrainedZones(ctx context.Context, constraints *cloudprovider.Constraints, clusterName string) ([]string, error) { - // 1. Return zone if specified. - if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { - return []string{zone}, nil - } - // 2. Return all zone options - zones, err := p.GetZones(ctx, clusterName) - if err != nil { - return nil, err - } - return zones, nil -} - -func (p *VPCProvider) getSubnetIds(ctx context.Context, clusterName string) ([]string, error) { +func (p *VPCProvider) GetSubnetIds(ctx context.Context, clusterName string) ([]string, error) { zonalSubnets, err := p.subnetProvider.Get(ctx, clusterName) if err != nil { return nil, err @@ -132,6 +98,19 @@ func (p *VPCProvider) getSubnetIds(ctx context.Context, clusterName string) ([]s return subnetIds, nil } +func (p *VPCProvider) getConstrainedZones(ctx context.Context, constraints *cloudprovider.Constraints, clusterName string) ([]string, error) { + // 1. Return zone if specified. + if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { + return []string{zone}, nil + } + // 2. Return all zone options + zones, err := p.GetZones(ctx, clusterName) + if err != nil { + return nil, err + } + return zones, nil +} + type ZonalSubnets map[string][]*ec2.Subnet type SubnetProvider struct { From 72857e59f627ae6d2821eb5011c4efe8425f61cd Mon Sep 17 00:00:00 2001 From: Prateek Gogia Date: Sun, 7 Feb 2021 11:12:25 -0600 Subject: [PATCH 4/4] Address comments and move packer to packing pkg --- pkg/cloudprovider/aws/fleet/capacity.go | 27 +++++++++-------- pkg/cloudprovider/aws/fleet/factory.go | 13 ++++---- pkg/cloudprovider/aws/fleet/instance.go | 3 +- pkg/cloudprovider/aws/fleet/nodefactory.go | 1 + .../aws/fleet/packing/packing.go | 30 +++++++++++++------ pkg/cloudprovider/fake/capacity.go | 2 +- pkg/cloudprovider/types.go | 17 ++--------- 7 files changed, 47 insertions(+), 46 deletions(-) diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index 62365eb7fbb3..65158df10b11 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -20,6 +20,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/packing" v1 "k8s.io/api/core/v1" ) @@ -27,25 +28,25 @@ import ( type Capacity struct { spec *v1alpha1.ProvisionerSpec nodeFactory *NodeFactory - packing cloudprovider.Packer + packer packing.Packer instanceProvider *InstanceProvider - vpc *VPCProvider + vpcProvider *VPCProvider } // Create a set of nodes given the constraints. -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.PodPackings, error) { +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.NodePackings, error) { // 1. Compute Packing given the constraints - packings, err := c.packing.Pack(ctx, constraints.Pods) + instancePackings, err := c.packer.Pack(ctx, constraints.Pods) if err != nil { return nil, fmt.Errorf("computing bin packing, %w", err) } - launchTemplate, err := c.vpc.GetLaunchTemplate(ctx, c.spec.Cluster) + launchTemplate, err := c.vpcProvider.GetLaunchTemplate(ctx, c.spec.Cluster) if err != nil { return nil, fmt.Errorf("getting launch template, %w", err) } - zonalSubnetOptions, err := c.vpc.GetZonalSubnets(ctx, constraints, c.spec.Cluster.Name) + zonalSubnetOptions, err := c.vpcProvider.GetZonalSubnets(ctx, constraints, c.spec.Cluster.Name) if err != nil { return nil, fmt.Errorf("getting zonal subnets, %w", err) } @@ -53,14 +54,14 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constr // 2. Create Instances var instanceIds []*string podsMapped := make(map[string][]*v1.Pod) - for _, packing := range packings { - ec2InstanceID, err := c.instanceProvider.Create(ctx, packing.InstanceTypeOptions, launchTemplate, zonalSubnetOptions) + for _, packing := range instancePackings { + instanceID, err := c.instanceProvider.Create(ctx, launchTemplate, packing.InstanceTypeOptions, zonalSubnetOptions) if err != nil { // TODO Aggregate errors and continue return nil, fmt.Errorf("creating capacity %w", err) } - podsMapped[*ec2InstanceID] = packing.Pods - instanceIds = append(instanceIds, ec2InstanceID) + podsMapped[*instanceID] = packing.Pods + instanceIds = append(instanceIds, instanceID) } // 3. Convert to Nodes @@ -68,7 +69,7 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constr if err != nil { return nil, fmt.Errorf("determining nodes, %w", err) } - nodePackings := make(cloudprovider.PodPackings) + nodePackings := make(cloudprovider.NodePackings) for instanceID, node := range nodes { nodePackings[node] = podsMapped[instanceID] } @@ -80,13 +81,13 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constr func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey) ([]string, error) { switch key { case cloudprovider.TopologyKeyZone: - zones, err := c.vpc.GetZones(ctx, c.spec.Cluster.Name) + zones, err := c.vpcProvider.GetZones(ctx, c.spec.Cluster.Name) if err != nil { return nil, err } return zones, nil case cloudprovider.TopologyKeySubnet: - subnets, err := c.vpc.GetSubnetIds(ctx, c.spec.Cluster.Name) + subnets, err := c.vpcProvider.GetSubnetIds(ctx, c.spec.Cluster.Name) if err != nil { return nil, err } diff --git a/pkg/cloudprovider/aws/fleet/factory.go b/pkg/cloudprovider/aws/fleet/factory.go index 3793b6de0626..b57408455f4e 100644 --- a/pkg/cloudprovider/aws/fleet/factory.go +++ b/pkg/cloudprovider/aws/fleet/factory.go @@ -20,7 +20,6 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/iam/iamiface" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" - "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/packing" "github.com/patrickmn/go-cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -59,19 +58,19 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie } return &Factory{ ec2: ec2, - vpc: vpcProvider, + vpcProvider: vpcProvider, nodeFactory: &NodeFactory{ec2: ec2}, instanceProvider: &InstanceProvider{ec2: ec2, vpc: vpcProvider}, - packing: packing.NewPacker(ec2), + packer: packing.NewPacker(ec2), } } type Factory struct { ec2 ec2iface.EC2API - vpc *VPCProvider + vpcProvider *VPCProvider nodeFactory *NodeFactory instanceProvider *InstanceProvider - packing cloudprovider.Packer + packer packing.Packer } func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity { @@ -79,7 +78,7 @@ func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity { spec: spec, nodeFactory: f.nodeFactory, instanceProvider: f.instanceProvider, - vpc: f.vpc, - packing: f.packing, + vpcProvider: f.vpcProvider, + packer: f.packer, } } diff --git a/pkg/cloudprovider/aws/fleet/instance.go b/pkg/cloudprovider/aws/fleet/instance.go index 10ba967f4212..4bb0e79ac4d0 100644 --- a/pkg/cloudprovider/aws/fleet/instance.go +++ b/pkg/cloudprovider/aws/fleet/instance.go @@ -31,11 +31,10 @@ type InstanceProvider struct { // Create an instance given the constraints. func (p *InstanceProvider) Create(ctx context.Context, - instanceTypeOptions []string, launchTemplate *ec2.LaunchTemplate, + instanceTypeOptions []string, zonalSubnetOptions map[string][]*ec2.Subnet, ) (*string, error) { - // 1. Construct override options. var overrides []*ec2.FleetLaunchTemplateOverridesRequest for _, instanceType := range instanceTypeOptions { diff --git a/pkg/cloudprovider/aws/fleet/nodefactory.go b/pkg/cloudprovider/aws/fleet/nodefactory.go index b22adbca3b70..b006b92a5e7e 100644 --- a/pkg/cloudprovider/aws/fleet/nodefactory.go +++ b/pkg/cloudprovider/aws/fleet/nodefactory.go @@ -33,6 +33,7 @@ type NodeFactory struct { ec2 ec2iface.EC2API } +// For a given set of instanceIds return a map of instanceID to Kubernetes node object. func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) (map[string]*v1.Node, error) { // Backoff retry is necessary here because EC2's APIs are eventually // consistent. In most cases, this call will only be made once. diff --git a/pkg/cloudprovider/aws/fleet/packing/packing.go b/pkg/cloudprovider/aws/fleet/packing/packing.go index 6229910c8e83..3c46698bc895 100644 --- a/pkg/cloudprovider/aws/fleet/packing/packing.go +++ b/pkg/cloudprovider/aws/fleet/packing/packing.go @@ -18,24 +18,36 @@ import ( "context" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "github.com/awslabs/karpenter/pkg/cloudprovider" "go.uber.org/zap" v1 "k8s.io/api/core/v1" ) -type Packer struct { +type PodPacker struct { ec2 ec2iface.EC2API } -func NewPacker(ec2 ec2iface.EC2API) *Packer { - return &Packer{ec2: ec2} +// PodPacker helps pack the pods and calculates efficient placement on the instances. +type Packer interface { + Pack(ctx context.Context, pods []*v1.Pod) ([]*Packings, error) } -// Get returns the packings for the provided pods. Computes an ordered set of viable instance -// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions. -func (p *Packer) Pack(ctx context.Context, pods []*v1.Pod) ([]*cloudprovider.Packings, error) { - zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1) - return []*cloudprovider.Packings{ +// Packings contains a list of pods that can be placed on any of Instance type +// in the InstanceTypeOptions +type Packings struct { + Pods []*v1.Pod + InstanceTypeOptions []string +} + +func NewPacker(ec2 ec2iface.EC2API) *PodPacker { + return &PodPacker{ec2: ec2} +} + +// Pack returns the packings for the provided pods. Computes a set of viable +// instance types for each packing of pods. Instance variety enables EC2 to make +// better cost and availability decisions. +func (p *PodPacker) Pack(ctx context.Context, pods []*v1.Pod) ([]*Packings, error) { + zap.S().Debugf("Successfully packed %d pods onto %d nodes", len(pods), 1) + return []*Packings{ { InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types Pods: pods, diff --git a/pkg/cloudprovider/fake/capacity.go b/pkg/cloudprovider/fake/capacity.go index ceeb3b3d957c..3f2c87b7552a 100644 --- a/pkg/cloudprovider/fake/capacity.go +++ b/pkg/cloudprovider/fake/capacity.go @@ -23,7 +23,7 @@ import ( type Capacity struct { } -func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.PodPackings, error) { +func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.NodePackings, error) { return nil, nil } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index af6ab8e3e2f7..26a29255f42e 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -58,7 +58,7 @@ type NodeGroup interface { // Capacity provisions a set of nodes that fulfill a set of constraints. type Capacity interface { // Create a set of nodes to fulfill the desired capacity given constraints. - Create(context.Context, *Constraints) (PodPackings, error) + Create(context.Context, *Constraints) (NodePackings, error) // GetTopologyDomains returns a list of topology domains supported by the // cloud provider for the given key. @@ -82,19 +82,8 @@ type Constraints struct { Architecture Architecture } -// PodPackings is a solution to packing pods onto nodes given constraints. -type PodPackings map[*v1.Node][]*v1.Pod - -type Packings struct { - Pods []*v1.Pod - InstanceTypeOptions []string -} - -// Packer is a method that takes contraints and calculates the pods that can be -// efficiently placed on the instances. -type Packer interface { - Pack(ctx context.Context, pods []*v1.Pod) ([]*Packings, error) -} +// NodePackings is a solution to packing pods onto nodes given constraints. +type NodePackings map[*v1.Node][]*v1.Pod // TopologyKey: // https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/