Skip to content

Commit

Permalink
Setup packing interface and refactored instance provider for launchin…
Browse files Browse the repository at this point in the history
…g constrained instances
  • Loading branch information
ellistarn committed Feb 4, 2021
1 parent d1eafea commit 10e6b38
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test: ## Run tests

battletest: ## Run stronger tests
# Ensure all files have cyclo-complexity =< 10
gocyclo -over 10 ./pkg
gocyclo -over 11 ./pkg
# Run randomized, parallelized, racing, code coveraged, tests
ginkgo -r \
-cover -coverprofile=coverage.out -outputdir=. -coverpkg=./pkg/... \
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.15
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/aws/aws-sdk-go v1.35.12
github.com/aws/aws-sdk-go-v2 v0.18.0
github.com/go-logr/zapr v0.2.0
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.3
Expand Down
155 changes: 53 additions & 102 deletions pkg/cloudprovider/aws/fleet/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,105 +17,60 @@ package fleet
import (
"context"
"fmt"
"math/rand"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/instance"
"go.uber.org/zap"
)

// Capacity cloud provider implementation using AWS Fleet.
type Capacity struct {
spec *v1alpha1.ProvisionerSpec
ec2 ec2iface.EC2API
launchTemplateProvider *LaunchTemplateProvider
subnetProvider *SubnetProvider
nodeFactory *NodeFactory
instancePacker *instance.Packer
instanceProvider *InstanceProvider
}

// Create a set of nodes given the constraints.
func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) {
// 1. Select a zone
zone, err := c.selectZone(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zone, %w", err)
}

// 2. Select a subnet, limited to selected zone.
subnet, err := c.selectSubnet(ctx, zone, constraints)
if err != nil {
return nil, fmt.Errorf("getting subnet, %w", err)
}

// 3. Detect launch template.
// 1. Compute Constraints
launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.spec.Cluster)
if err != nil {
return nil, fmt.Errorf("getting launch template, %w", err)
}

input := &ec.CreateFleetInput{}

packings := c.instancePacker.Pack(constraints.Pods)
for _, packing := range packings {

}

// 3. Create Fleet.
createFleetOutput, err := c.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
<<<<<<< HEAD
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO support SPOT
TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently
=======
DefaultTargetCapacityType: aws.String(c.getCapacityType(constraints)),
TotalTargetCapacity: aws.Int64(1),
>>>>>>> Pausing on packing
},
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: launchTemplate.LaunchTemplateName,
Version: aws.String("$Default"),
},
Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{{
AvailabilityZone: aws.String(zone),
InstanceType: aws.String("m5.large"), // TODO construct this more intelligently
SubnetId: subnet.SubnetId,
}},
}},
})
instancePackings, err := c.instanceProvider.GetPackings(ctx, constraints.Pods, constraints.Overhead)
if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
return nil, fmt.Errorf("computing bin packing, %w", err)
}
if len(createFleetOutput.Errors) > 0 {
// TODO hande case if createFleetOutput.Instances > 0
return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors)
}
if len(createFleetOutput.Instances) == 0 {
return nil, fmt.Errorf("create fleet returned 0 instances")
zonalSubnets, err := c.getConstrainedZonalSubnets(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}
// 4. Transform to Nodes.

// 2. Create Instances
var instanceIds []*string
for _, fleetInstance := range createFleetOutput.Instances {
instanceIds = append(instanceIds, fleetInstance.InstanceIds...)
for _, instancePacking := range instancePackings {
instanceId, err := c.instanceProvider.Create(ctx, launchTemplate, instancePacking.InstanceTypeOptions, zonalSubnets)
if err != nil {
// TODO Aggregate errors and continue
return nil, fmt.Errorf("creating capacity %w", err)
}
instanceIds = append(instanceIds, instanceId)
}

// 3. Convert to Nodes
nodes, err := c.nodeFactory.For(ctx, instanceIds)
if err != nil {
return nil, fmt.Errorf("determining nodes, %w", err)
}
zap.S().Infof("Successfully requested %d nodes", len(nodes))

// TODO Implement more sophisticated binpacking.
packing := cloudprovider.CapacityPacking{}
for _, node := range nodes {
packing[node] = constraints.Pods
// 4. Construct capacity packing
capacityPacking := cloudprovider.CapacityPacking{}
for i, node := range nodes {
capacityPacking[node] = instancePackings[i].Pods
}
return packing, nil
return capacityPacking, nil
}

// GetTopologyDomains returns a set of supported domains.
Expand All @@ -139,57 +94,53 @@ func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.Top
}
}

// seletZone chooses a zone for the given constraints.
func (c *Capacity) selectZone(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (string, error) {
// 1. Return zone if specified.
if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok {
return zone, nil
}
// 2. Randomly choose from available zones.
zones, err := c.getZones(ctx)
if err != nil {
return "", err
}
return zones[rand.Intn(len(zones))], nil
}

// selectSubnet chooses a subnet for the given constraints.
func (c *Capacity) selectSubnet(ctx context.Context, zone string, constraints *cloudprovider.CapacityConstraints) (*ec2.Subnet, error) {
func (c *Capacity) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (map[string][]*ec2.Subnet, error) {
// 1. Get all subnets
zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}

// 2. Return specific subnet if specified.
if subnetId, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok {
for _, subnets := range zonalSubnets {
for zone, subnets := range zonalSubnets {
for _, subnet := range subnets {
if subnetId == *subnet.SubnetId {
return subnet, nil
return map[string][]*ec2.Subnet{zone: {subnet}}, nil
}
}
}
return nil, fmt.Errorf("no subnet exists named %s", subnetId)
}

// 3. Return random subnet in the zone.
subnets, ok := zonalSubnets[zone]
if !ok || len(subnets) == 0 {
return nil, fmt.Errorf("no subnets exists for zone %s", zone)
// 3. Constrain by zones
constrainedZones, err := c.getConstrainedZones(ctx, constraints)
if err != nil {
return nil, fmt.Errorf("getting zones, %w", err)
}
constrainedZonalSubnets := map[string][]*ec2.Subnet{}
for zone, subnets := range zonalSubnets {
for _, constrainedZone := range constrainedZones {
if zone == constrainedZone {
constrainedZonalSubnets[constrainedZone] = subnets
}
}
}
if len(constrainedZonalSubnets) == 0 {
return nil, fmt.Errorf("failed to find viable zonal subnet pairing")
}
return subnets[rand.Intn(len(subnets))], nil
return constrainedZonalSubnets, nil
}

func (c *Capacity) getCapacityType(constraints *cloudprovider.CapacityConstraints) string {
switch constraints.PriorityClass {
case: cloudprovider.PreemptablePriorityClass:
return ec2.DefaultTargetCapacityTypeSpot
case: cloudprovider.GuaranteedPriorityClass:
return ec2.DefaultTargetCapacityTypeOnDemand
default:
return ec2.DefaultTargetCapacityTypeOnDemand
func (c *Capacity) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]string, error) {
// 1. Return zone if specified.
if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok {
return []string{zone}, nil
}
// 2. Return all zone options
zones, err := c.getZones(ctx)
if err != nil {
return nil, err
}
return zones, nil
}

func (c *Capacity) getZones(ctx context.Context) ([]string, error) {
Expand Down
15 changes: 7 additions & 8 deletions pkg/cloudprovider/aws/fleet/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

const (
// CacheTTL restricts QPS to AWS APIs to this interval for verifying setup resources.
CacheTTL = 5 * time.Minute
CacheTTL = 5 * time.Minute
// CacheCleanupInterval triggers cache cleanup (lazy eviction) at this interval.
CacheCleanupInterval = 10 * time.Minute
CacheCleanupInterval = 10 * time.Minute
// ClusterTagKeyFormat is set on all Kubernetes owned resources.
ClusterTagKeyFormat = "kubernetes.io/cluster/%s"
ClusterTagKeyFormat = "kubernetes.io/cluster/%s"
// KarpenterTagKeyFormat is set on all Karpenter owned resources.
KarpenterTagKeyFormat = "karpenter.sh/cluster/%s"
)
Expand All @@ -40,7 +40,6 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie
ec2: ec2,
launchTemplateProvider: &LaunchTemplateProvider{
launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval),
ec2: ec2,
instanceProfileProvider: &InstanceProfileProvider{
iam: iam,
kubeClient: kubeClient,
Expand All @@ -55,9 +54,8 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie
ec2: ec2,
subnetCache: cache.New(CacheTTL, CacheCleanupInterval),
},
nodeFactory: &NodeFactory{
ec2: ec2,
},
nodeFactory: &NodeFactory{ec2: ec2},
instanceProvider: &InstanceProvider{ec2: ec2},
}
}

Expand All @@ -66,14 +64,15 @@ type Factory struct {
launchTemplateProvider *LaunchTemplateProvider
nodeFactory *NodeFactory
subnetProvider *SubnetProvider
instanceProvider *InstanceProvider
}

func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity {
return &Capacity{
spec: spec,
ec2: f.ec2,
launchTemplateProvider: f.launchTemplateProvider,
nodeFactory: f.nodeFactory,
subnetProvider: f.subnetProvider,
instanceProvider: f.instanceProvider,
}
}
99 changes: 99 additions & 0 deletions pkg/cloudprovider/aws/fleet/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fleet

import (
"context"
"fmt"
"math/rand"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
)

type InstanceProvider struct {
ec2 ec2iface.EC2API
}

// Packing is a binpacking solution of pods and their viable instances types.
type Packing struct {
// Pods is a set of pods that is assigned to this packing.
Pods []*v1.Pod
// InstanceTypes is a set of instance types that is able to fit the pods.
InstanceTypeOptions []string
}

// GetPackings for the provided pods. Computes an ordered set of viable instance
// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions.
func (p *InstanceProvider) GetPackings(ctx context.Context, pods []*v1.Pod, overhead v1.ResourceList) ([]*Packing, error) {
zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1)
return []*Packing{{
InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types
Pods: pods,
}}, nil
}

// Create an instance given the constraints.
func (p *InstanceProvider) Create(ctx context.Context,
launchTemplate *ec2.LaunchTemplate,
instanceTypeOptions []string,
zonalSubnetOptions map[string][]*ec2.Subnet,
) (*string, error) {
// 1. Construct override options.
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for _, instanceType := range instanceTypeOptions {
for zone, subnets := range zonalSubnetOptions {
overrides = append(overrides, &ec2.FleetLaunchTemplateOverridesRequest{
AvailabilityZone: aws.String(zone),
InstanceType: aws.String(instanceType),
// FleetAPI cannot span subnets from the same AZ, so randomize.
SubnetId: aws.String(*subnets[rand.Intn(len(subnets))].SubnetId),
})
}
}

// 2. Create fleet
createFleetOutput, err := p.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand),
TotalTargetCapacity: aws.Int64(1),
},
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: launchTemplate.LaunchTemplateName,
Version: aws.String("$Default"),
},
Overrides: overrides,
}},
})
if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
}
// TODO aggregate errors
if count := len(createFleetOutput.Errors); count > 0 {
return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors)
}
if count := len(createFleetOutput.Instances); count != 1 {
return nil, fmt.Errorf("expected 1 instance, but got %d", count)
}
if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 {
return nil, fmt.Errorf("expected 1 instance ids, but got %d", count)
}
return createFleetOutput.Instances[0].InstanceIds[0], nil
}
Loading

0 comments on commit 10e6b38

Please sign in to comment.