Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cloud provider, add packing package #221

Merged
merged 4 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.3
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/common v0.14.0
github.com/robfig/cron/v3 v3.0.0
Expand Down
125 changes: 26 additions & 99 deletions pkg/cloudprovider/aws/fleet/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,73 +18,75 @@ import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/service/ec2"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
v1 "k8s.io/api/core/v1"
)

// Capacity cloud provider implementation using AWS Fleet.
type Capacity struct {
spec *v1alpha1.ProvisionerSpec
launchTemplateProvider *LaunchTemplateProvider
subnetProvider *SubnetProvider
nodeFactory *NodeFactory
instanceProvider *InstanceProvider
spec *v1alpha1.ProvisionerSpec
nodeFactory *NodeFactory
packing cloudprovider.Packer
instanceProvider *InstanceProvider
vpc *VPCProvider
}

// Create a set of nodes given the constraints.
func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) {
// 1. Compute Constraints
launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.spec.Cluster)
func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Constraints) (cloudprovider.PodPackings, error) {
// 1. Compute Packing given the constraints
packings, err := c.packing.Pack(ctx, constraints.Pods)
if err != nil {
return nil, fmt.Errorf("getting launch template, %w", err)
return nil, fmt.Errorf("computing bin packing, %w", err)
}
instancePackings, err := c.instanceProvider.GetPackings(ctx, constraints.Pods, constraints.Overhead)

launchTemplate, err := c.vpc.GetLaunchTemplate(ctx, c.spec.Cluster)
if err != nil {
return nil, fmt.Errorf("computing bin packing, %w", err)
return nil, fmt.Errorf("getting launch template, %w", err)
}
zonalSubnets, err := c.getConstrainedZonalSubnets(ctx, constraints)

zonalSubnetOptions, err := c.vpc.GetZonalSubnets(ctx, constraints, c.spec.Cluster.Name)
if err != nil {
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}

// 2. Create Instances
var instanceIds []*string
for _, instancePacking := range instancePackings {
instanceId, err := c.instanceProvider.Create(ctx, launchTemplate, instancePacking.InstanceTypeOptions, zonalSubnets)
podsMapped := make(map[string][]*v1.Pod)
for _, packing := range packings {
ec2InstanceID, err := c.instanceProvider.Create(ctx, packing.InstanceTypeOptions, launchTemplate, zonalSubnetOptions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd just call this instanceId for consistency elsewhere. EC2 doesn't add much value to the name here.

if err != nil {
// TODO Aggregate errors and continue
return nil, fmt.Errorf("creating capacity %w", err)
}
instanceIds = append(instanceIds, instanceId)
podsMapped[*ec2InstanceID] = packing.Pods
instanceIds = append(instanceIds, ec2InstanceID)
}

// 3. Convert to Nodes
nodes, err := c.nodeFactory.For(ctx, instanceIds)
if err != nil {
return nil, fmt.Errorf("determining nodes, %w", err)
}

// 4. Construct capacity packing
capacityPacking := cloudprovider.CapacityPacking{}
for i, node := range nodes {
capacityPacking[node] = instancePackings[i].Pods
nodePackings := make(cloudprovider.PodPackings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming seems odd to me. I typically try to just name my variables according to the type. In this case, it's not clear if the concept we're dealing with here is a "pod packing" or a "node packing". Whatever we call it, let's be consistent.

I also see that you're a little cornered since you have two packing concepts in this function (not to mention the misnamed "podpackings"). Off the top of my head, I might call the one above an "InstancePacking" since it's EC2 Instance specific and the second one a "NodePacking" since its a kubernetes node.

for instanceID, node := range nodes {
nodePackings[node] = podsMapped[instanceID]
}
return capacityPacking, nil
return nodePackings, nil
}

// GetTopologyDomains returns a set of supported domains.
// e.g. us-west-2 -> [ us-west-2a, us-west-2b ]
func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.TopologyKey) ([]string, error) {
switch key {
case cloudprovider.TopologyKeyZone:
zones, err := c.getZones(ctx)
zones, err := c.vpc.GetZones(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
}
return zones, nil
case cloudprovider.TopologyKeySubnet:
subnets, err := c.getSubnetIds(ctx)
subnets, err := c.vpc.GetSubnetIds(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
}
Expand All @@ -93,78 +95,3 @@ func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.Top
return nil, fmt.Errorf("unrecognized topology key %s", key)
}
}

func (c *Capacity) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (map[string][]*ec2.Subnet, error) {
// 1. Get all subnets
zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}
// 2. Return specific subnet if specified.
if subnetId, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok {
for zone, subnets := range zonalSubnets {
for _, subnet := range subnets {
if subnetId == *subnet.SubnetId {
return map[string][]*ec2.Subnet{zone: {subnet}}, nil
}
}
}
return nil, fmt.Errorf("no subnet exists named %s", subnetId)
}
// 3. Constrain by zones
constrainedZones, err := c.getConstrainedZones(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zones, %w", err)
}
constrainedZonalSubnets := map[string][]*ec2.Subnet{}
for zone, subnets := range zonalSubnets {
for _, constrainedZone := range constrainedZones {
if zone == constrainedZone {
constrainedZonalSubnets[constrainedZone] = subnets
}
}
}
if len(constrainedZonalSubnets) == 0 {
return nil, fmt.Errorf("failed to find viable zonal subnet pairing")
}
return constrainedZonalSubnets, nil
}

func (c *Capacity) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]string, error) {
// 1. Return zone if specified.
if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok {
return []string{zone}, nil
}
// 2. Return all zone options
zones, err := c.getZones(ctx)
if err != nil {
return nil, err
}
return zones, nil
}

func (c *Capacity) getZones(ctx context.Context) ([]string, error) {
zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
}
zones := []string{}
for zone := range zonalSubnets {
zones = append(zones, zone)
}
return zones, nil
}

func (c *Capacity) getSubnetIds(ctx context.Context) ([]string, error) {
zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
}
subnetIds := []string{}
for _, subnets := range zonalSubnets {
for _, subnet := range subnets {
subnetIds = append(subnetIds, *subnet.SubnetId)
}
}
return subnetIds, nil
}
32 changes: 19 additions & 13 deletions pkg/cloudprovider/aws/fleet/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/iam/iamiface"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/packing"
"github.com/patrickmn/go-cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -36,8 +38,7 @@ const (
)

func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Client) *Factory {
return &Factory{
ec2: ec2,
vpcProvider := &VPCProvider{
launchTemplateProvider: &LaunchTemplateProvider{
ec2: ec2,
launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval),
Expand All @@ -55,25 +56,30 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie
ec2: ec2,
subnetCache: cache.New(CacheTTL, CacheCleanupInterval),
},
}
return &Factory{
ec2: ec2,
vpc: vpcProvider,
nodeFactory: &NodeFactory{ec2: ec2},
instanceProvider: &InstanceProvider{ec2: ec2},
instanceProvider: &InstanceProvider{ec2: ec2, vpc: vpcProvider},
packing: packing.NewPacker(ec2),
}
}

type Factory struct {
ec2 ec2iface.EC2API
launchTemplateProvider *LaunchTemplateProvider
nodeFactory *NodeFactory
subnetProvider *SubnetProvider
instanceProvider *InstanceProvider
ec2 ec2iface.EC2API
vpc *VPCProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd call this vpcProvider for parity with the others.

nodeFactory *NodeFactory
instanceProvider *InstanceProvider
packing cloudprovider.Packer
}

func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity {
return &Capacity{
spec: spec,
launchTemplateProvider: f.launchTemplateProvider,
nodeFactory: f.nodeFactory,
subnetProvider: f.subnetProvider,
instanceProvider: f.instanceProvider,
spec: spec,
nodeFactory: f.nodeFactory,
instanceProvider: f.instanceProvider,
vpc: f.vpc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same naming here

packing: f.packing,
}
}
24 changes: 3 additions & 21 deletions pkg/cloudprovider/aws/fleet/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,20 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
)

type InstanceProvider struct {
ec2 ec2iface.EC2API
}

// Packing is a binpacking solution of pods and their viable instances types.
type Packing struct {
// Pods is a set of pods that is assigned to this packing.
Pods []*v1.Pod
// InstanceTypes is a set of instance types that is able to fit the pods.
InstanceTypeOptions []string
}

// GetPackings for the provided pods. Computes an ordered set of viable instance
// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions.
func (p *InstanceProvider) GetPackings(ctx context.Context, pods []*v1.Pod, overhead v1.ResourceList) ([]*Packing, error) {
zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1)
return []*Packing{{
InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types
Pods: pods,
}}, nil
vpc *VPCProvider
}

// Create an instance given the constraints.
func (p *InstanceProvider) Create(ctx context.Context,
launchTemplate *ec2.LaunchTemplate,
instanceTypeOptions []string,
launchTemplate *ec2.LaunchTemplate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for the reordering here? Logically, launchTemplate makes more sense to go first since it's the primary definition of the node, where the other fields are ancillary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this got jumbled up with changes back and forth. I should have checked the final diff here in Github

zonalSubnetOptions map[string][]*ec2.Subnet,
) (*string, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why the newline here?

// 1. Construct override options.
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for _, instanceType := range instanceTypeOptions {
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/aws/fleet/nodefactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type NodeFactory struct {
ec2 ec2iface.EC2API
}

func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) ([]*v1.Node, error) {
func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) (map[string]*v1.Node, error) {
prateekgogia marked this conversation as resolved.
Show resolved Hide resolved
// Backoff retry is necessary here because EC2's APIs are eventually
// consistent. In most cases, this call will only be made once.
for attempt := retry.Start(retry.Exponential{
Expand All @@ -53,11 +53,11 @@ func (n *NodeFactory) For(ctx context.Context, instanceIds []*string) ([]*v1.Nod
return nil, fmt.Errorf("failed to describe ec2 instances")
}

func (n *NodeFactory) nodesFrom(reservations []*ec2.Reservation) []*v1.Node {
var nodes []*v1.Node
func (n *NodeFactory) nodesFrom(reservations []*ec2.Reservation) map[string]*v1.Node {
var nodes map[string]*v1.Node
for _, reservation := range reservations {
for _, instance := range reservation.Instances {
nodes = append(nodes, n.nodeFrom(instance))
nodes[*instance.InstanceId] = n.nodeFrom(instance)
}
}
return nodes
Expand Down
44 changes: 44 additions & 0 deletions pkg/cloudprovider/aws/fleet/packing/packing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package packing

import (
"context"

"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
)

type Packer struct {
ec2 ec2iface.EC2API
}

func NewPacker(ec2 ec2iface.EC2API) *Packer {
return &Packer{ec2: ec2}
}

// Get returns the packings for the provided pods. Computes an ordered set of viable instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Get -> Pack

Also -- I'm not sure it's an ordered set.

// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions.
func (p *Packer) Pack(ctx context.Context, pods []*v1.Pod) ([]*cloudprovider.Packings, error) {
zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I've been leaning towards Debug log level for all deep library steps (cloudprovider, etc). I'd love for the autoscaler to emit a relatively small number of logs per minute, unless there are errors or debug is enabled.

return []*cloudprovider.Packings{
{
InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types
Pods: pods,
},
}, nil
}
Loading