Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup packing interface and refactored instance provider for launching constrained instances #216

Merged
merged 2 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test: ## Run tests

battletest: ## Run stronger tests
# Ensure all files have cyclo-complexity =< 10
gocyclo -over 10 ./pkg
gocyclo -over 11 ./pkg
# Run randomized, parallelized, racing, code coveraged, tests
ginkgo -r \
-cover -coverprofile=coverage.out -outputdir=. -coverpkg=./pkg/... \
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQ
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.35.12 h1:qpxQ/DXfgsTNSYn8mUaCgQiJkCjBP8iHKw5ju+wkucU=
github.com/aws/aws-sdk-go v1.35.12/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/aws/aws-sdk-go-v2 v0.18.0 h1:qZ+woO4SamnH/eEbjM2IDLhRNwIwND/RQyVlBLp3Jqg=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down
134 changes: 55 additions & 79 deletions pkg/cloudprovider/aws/fleet/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,91 +17,60 @@ package fleet
import (
"context"
"fmt"
"math/rand"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"go.uber.org/zap"
)

// Capacity cloud provider implementation using AWS Fleet.
type Capacity struct {
spec *v1alpha1.ProvisionerSpec
ec2 ec2iface.EC2API
launchTemplateProvider *LaunchTemplateProvider
ellistarn marked this conversation as resolved.
Show resolved Hide resolved
subnetProvider *SubnetProvider
nodeFactory *NodeFactory
instanceProvider *InstanceProvider
}

// Create a set of nodes given the constraints.
func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) {
// 1. Select a zone
zone, err := c.selectZone(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zone, %w", err)
}

// 2. Select a subnet, limited to selected zone.
subnet, err := c.selectSubnet(ctx, zone, constraints)
if err != nil {
return nil, fmt.Errorf("getting subnet, %w", err)
}

// 3. Detect launch template.
// 1. Compute Constraints
launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.spec.Cluster)
if err != nil {
return nil, fmt.Errorf("getting launch template, %w", err)
}

// 3. Create Fleet.
createFleetOutput, err := c.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO support SPOT
TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently
},
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: launchTemplate.LaunchTemplateName,
Version: aws.String("$Default"),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{{
AvailabilityZone: aws.String(zone),
InstanceType: aws.String("m5.large"), // TODO construct this more intelligently
SubnetId: subnet.SubnetId,
}},
}},
})
instancePackings, err := c.instanceProvider.GetPackings(ctx, constraints.Pods, constraints.Overhead)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking a little more about the change, I strongly feel GetPackings should be in a separate package to calculate the instance options and make packing decisions. This package should only be making packing decisions and let someone else call create fleet API. As this thing itself will become complex and this package can be the brain that decides packaging and does this one thing really well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huge +1

if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
}
if len(createFleetOutput.Errors) > 0 {
// TODO hande case if createFleetOutput.Instances > 0
return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors)
return nil, fmt.Errorf("computing bin packing, %w", err)
}
if len(createFleetOutput.Instances) == 0 {
return nil, fmt.Errorf("create fleet returned 0 instances")
zonalSubnets, err := c.getConstrainedZonalSubnets(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}
// 4. Transform to Nodes.

// 2. Create Instances
var instanceIds []*string
for _, fleetInstance := range createFleetOutput.Instances {
instanceIds = append(instanceIds, fleetInstance.InstanceIds...)
for _, instancePacking := range instancePackings {
instanceId, err := c.instanceProvider.Create(ctx, launchTemplate, instancePacking.InstanceTypeOptions, zonalSubnets)
if err != nil {
// TODO Aggregate errors and continue
return nil, fmt.Errorf("creating capacity %w", err)
}
instanceIds = append(instanceIds, instanceId)
}

// 3. Convert to Nodes
nodes, err := c.nodeFactory.For(ctx, instanceIds)
if err != nil {
return nil, fmt.Errorf("determining nodes, %w", err)
}
zap.S().Infof("Successfully requested %d nodes", len(nodes))

// TODO Implement more sophisticated binpacking.
packing := cloudprovider.CapacityPacking{}
for _, node := range nodes {
packing[node] = constraints.Pods
// 4. Construct capacity packing
capacityPacking := cloudprovider.CapacityPacking{}
for i, node := range nodes {
capacityPacking[node] = instancePackings[i].Pods
}
return packing, nil
return capacityPacking, nil
}

// GetTopologyDomains returns a set of supported domains.
Expand All @@ -125,46 +94,53 @@ func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.Top
}
}

// seletZone chooses a zone for the given constraints.
func (c *Capacity) selectZone(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (string, error) {
// 1. Return zone if specified.
if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok {
return zone, nil
}
// 2. Randomly choose from available zones.
zones, err := c.getZones(ctx)
if err != nil {
return "", err
}
return zones[rand.Intn(len(zones))], nil
}

// selectSubnet chooses a subnet for the given constraints.
func (c *Capacity) selectSubnet(ctx context.Context, zone string, constraints *cloudprovider.CapacityConstraints) (*ec2.Subnet, error) {
func (c *Capacity) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (map[string][]*ec2.Subnet, error) {
// 1. Get all subnets
zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}

// 2. Return specific subnet if specified.
if subnetId, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok {
for _, subnets := range zonalSubnets {
for zone, subnets := range zonalSubnets {
for _, subnet := range subnets {
if subnetId == *subnet.SubnetId {
return subnet, nil
return map[string][]*ec2.Subnet{zone: {subnet}}, nil
}
}
}
return nil, fmt.Errorf("no subnet exists named %s", subnetId)
}
// 3. Constrain by zones
constrainedZones, err := c.getConstrainedZones(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zones, %w", err)
}
constrainedZonalSubnets := map[string][]*ec2.Subnet{}
for zone, subnets := range zonalSubnets {
for _, constrainedZone := range constrainedZones {
if zone == constrainedZone {
constrainedZonalSubnets[constrainedZone] = subnets
}
}
}
if len(constrainedZonalSubnets) == 0 {
return nil, fmt.Errorf("failed to find viable zonal subnet pairing")
}
return constrainedZonalSubnets, nil
}

// 3. Return random subnet in the zone.
subnets, ok := zonalSubnets[zone]
if !ok || len(subnets) == 0 {
return nil, fmt.Errorf("no subnets exists for zone %s", zone)
func (c *Capacity) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]string, error) {
// 1. Return zone if specified.
if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok {
return []string{zone}, nil
}
return subnets[rand.Intn(len(subnets))], nil
// 2. Return all zone options
zones, err := c.getZones(ctx)
if err != nil {
return nil, err
}
return zones, nil
}

func (c *Capacity) getZones(ctx context.Context) ([]string, error) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/cloudprovider/aws/fleet/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

const (
// CacheTTL restricts QPS to AWS APIs to this interval for verifying setup resources.
CacheTTL = 5 * time.Minute
CacheTTL = 5 * time.Minute
// CacheCleanupInterval triggers cache cleanup (lazy eviction) at this interval.
CacheCleanupInterval = 10 * time.Minute
CacheCleanupInterval = 10 * time.Minute
// ClusterTagKeyFormat is set on all Kubernetes owned resources.
ClusterTagKeyFormat = "kubernetes.io/cluster/%s"
ClusterTagKeyFormat = "kubernetes.io/cluster/%s"
// KarpenterTagKeyFormat is set on all Karpenter owned resources.
KarpenterTagKeyFormat = "karpenter.sh/cluster/%s"
)
Expand All @@ -39,8 +39,8 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie
return &Factory{
ec2: ec2,
launchTemplateProvider: &LaunchTemplateProvider{
launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval),
ec2: ec2,
launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval),
instanceProfileProvider: &InstanceProfileProvider{
iam: iam,
kubeClient: kubeClient,
Expand All @@ -55,9 +55,8 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie
ec2: ec2,
subnetCache: cache.New(CacheTTL, CacheCleanupInterval),
},
nodeFactory: &NodeFactory{
ec2: ec2,
},
nodeFactory: &NodeFactory{ec2: ec2},
instanceProvider: &InstanceProvider{ec2: ec2},
}
}

Expand All @@ -66,14 +65,15 @@ type Factory struct {
launchTemplateProvider *LaunchTemplateProvider
nodeFactory *NodeFactory
subnetProvider *SubnetProvider
instanceProvider *InstanceProvider
}

func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity {
return &Capacity{
spec: spec,
ec2: f.ec2,
launchTemplateProvider: f.launchTemplateProvider,
nodeFactory: f.nodeFactory,
subnetProvider: f.subnetProvider,
instanceProvider: f.instanceProvider,
}
}
99 changes: 99 additions & 0 deletions pkg/cloudprovider/aws/fleet/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fleet

import (
"context"
"fmt"
"math/rand"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
)

type InstanceProvider struct {
ec2 ec2iface.EC2API
}

// Packing is a binpacking solution of pods and their viable instances types.
type Packing struct {
// Pods is a set of pods that is assigned to this packing.
Pods []*v1.Pod
// InstanceTypes is a set of instance types that is able to fit the pods.
InstanceTypeOptions []string
}

// GetPackings for the provided pods. Computes an ordered set of viable instance
// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions.
func (p *InstanceProvider) GetPackings(ctx context.Context, pods []*v1.Pod, overhead v1.ResourceList) ([]*Packing, error) {
zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1)
return []*Packing{{
InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types
Pods: pods,
}}, nil
}

// Create an instance given the constraints.
func (p *InstanceProvider) Create(ctx context.Context,
launchTemplate *ec2.LaunchTemplate,
instanceTypeOptions []string,
zonalSubnetOptions map[string][]*ec2.Subnet,
) (*string, error) {
// 1. Construct override options.
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for _, instanceType := range instanceTypeOptions {
for zone, subnets := range zonalSubnetOptions {
overrides = append(overrides, &ec2.FleetLaunchTemplateOverridesRequest{
AvailabilityZone: aws.String(zone),
InstanceType: aws.String(instanceType),
// FleetAPI cannot span subnets from the same AZ, so randomize.
SubnetId: aws.String(*subnets[rand.Intn(len(subnets))].SubnetId),
})
}
}

// 2. Create fleet
createFleetOutput, err := p.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand),
TotalTargetCapacity: aws.Int64(1),
},
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: launchTemplate.LaunchTemplateName,
Version: aws.String("$Default"),
},
Overrides: overrides,
}},
})
if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
}
// TODO aggregate errors
if count := len(createFleetOutput.Errors); count > 0 {
return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors)
}
if count := len(createFleetOutput.Instances); count != 1 {
return nil, fmt.Errorf("expected 1 instance, but got %d", count)
}
if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 {
return nil, fmt.Errorf("expected 1 instance ids, but got %d", count)
}
return createFleetOutput.Instances[0].InstanceIds[0], nil
}
Loading