diff --git a/Makefile b/Makefile index e922aa22470b..45331fe86d14 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test: ## Run tests battletest: ## Run stronger tests # Ensure all files have cyclo-complexity =< 10 - gocyclo -over 10 ./pkg + gocyclo -over 11 ./pkg # Run randomized, parallelized, racing, code coveraged, tests ginkgo -r \ -cover -coverprofile=coverage.out -outputdir=. -coverpkg=./pkg/... \ diff --git a/go.mod b/go.mod index 2b5e98ac651f..67cb0fe09fe9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.15 require ( github.com/Pallinder/go-randomdata v1.2.0 github.com/aws/aws-sdk-go v1.35.12 - github.com/aws/aws-sdk-go-v2 v0.18.0 github.com/go-logr/zapr v0.2.0 github.com/onsi/ginkgo v1.14.2 github.com/onsi/gomega v1.10.3 diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index ad9963ed63b5..59ea6d1c21e1 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -17,105 +17,60 @@ package fleet import ( "context" "fmt" - "math/rand" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" - "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fleet/instance" - "go.uber.org/zap" ) // Capacity cloud provider implementation using AWS Fleet. type Capacity struct { spec *v1alpha1.ProvisionerSpec - ec2 ec2iface.EC2API launchTemplateProvider *LaunchTemplateProvider subnetProvider *SubnetProvider nodeFactory *NodeFactory - instancePacker *instance.Packer + instanceProvider *InstanceProvider } // Create a set of nodes given the constraints. func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (cloudprovider.CapacityPacking, error) { - // 1. Select a zone - zone, err := c.selectZone(ctx, constraints) - if err != nil { - return nil, fmt.Errorf("getting zone, %w", err) - } - - // 2. Select a subnet, limited to selected zone. - subnet, err := c.selectSubnet(ctx, zone, constraints) - if err != nil { - return nil, fmt.Errorf("getting subnet, %w", err) - } - - // 3. Detect launch template. + // 1. Compute Constraints launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.spec.Cluster) if err != nil { return nil, fmt.Errorf("getting launch template, %w", err) } - - input := &ec.CreateFleetInput{} - - packings := c.instancePacker.Pack(constraints.Pods) - for _, packing := range packings { - - } - - // 3. Create Fleet. - createFleetOutput, err := c.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{ - Type: aws.String(ec2.FleetTypeInstant), - TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ -<<<<<<< HEAD - DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO support SPOT - TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently -======= - DefaultTargetCapacityType: aws.String(c.getCapacityType(constraints)), - TotalTargetCapacity: aws.Int64(1), ->>>>>>> Pausing on packing - }, - LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{ - LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{ - LaunchTemplateName: launchTemplate.LaunchTemplateName, - Version: aws.String("$Default"), - }, - Overrides: []*ec2.FleetLaunchTemplateOverridesRequest{{ - AvailabilityZone: aws.String(zone), - InstanceType: aws.String("m5.large"), // TODO construct this more intelligently - SubnetId: subnet.SubnetId, - }}, - }}, - }) + instancePackings, err := c.instanceProvider.GetPackings(ctx, constraints.Pods, constraints.Overhead) if err != nil { - return nil, fmt.Errorf("creating fleet %w", err) + return nil, fmt.Errorf("computing bin packing, %w", err) } - if len(createFleetOutput.Errors) > 0 { - // TODO hande case if createFleetOutput.Instances > 0 - return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors) - } - if len(createFleetOutput.Instances) == 0 { - return nil, fmt.Errorf("create fleet returned 0 instances") + zonalSubnets, err := c.getConstrainedZonalSubnets(ctx, constraints) + if err != nil { + return nil, fmt.Errorf("getting zonal subnets, %w", err) } - // 4. Transform to Nodes. + + // 2. Create Instances var instanceIds []*string - for _, fleetInstance := range createFleetOutput.Instances { - instanceIds = append(instanceIds, fleetInstance.InstanceIds...) + for _, instancePacking := range instancePackings { + instanceId, err := c.instanceProvider.Create(ctx, launchTemplate, instancePacking.InstanceTypeOptions, zonalSubnets) + if err != nil { + // TODO Aggregate errors and continue + return nil, fmt.Errorf("creating capacity %w", err) + } + instanceIds = append(instanceIds, instanceId) } + + // 3. Convert to Nodes nodes, err := c.nodeFactory.For(ctx, instanceIds) if err != nil { return nil, fmt.Errorf("determining nodes, %w", err) } - zap.S().Infof("Successfully requested %d nodes", len(nodes)) - // TODO Implement more sophisticated binpacking. - packing := cloudprovider.CapacityPacking{} - for _, node := range nodes { - packing[node] = constraints.Pods + // 4. Construct capacity packing + capacityPacking := cloudprovider.CapacityPacking{} + for i, node := range nodes { + capacityPacking[node] = instancePackings[i].Pods } - return packing, nil + return capacityPacking, nil } // GetTopologyDomains returns a set of supported domains. @@ -139,57 +94,53 @@ func (c *Capacity) GetTopologyDomains(ctx context.Context, key cloudprovider.Top } } -// seletZone chooses a zone for the given constraints. -func (c *Capacity) selectZone(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (string, error) { - // 1. Return zone if specified. - if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { - return zone, nil - } - // 2. Randomly choose from available zones. - zones, err := c.getZones(ctx) - if err != nil { - return "", err - } - return zones[rand.Intn(len(zones))], nil -} - -// selectSubnet chooses a subnet for the given constraints. -func (c *Capacity) selectSubnet(ctx context.Context, zone string, constraints *cloudprovider.CapacityConstraints) (*ec2.Subnet, error) { +func (c *Capacity) getConstrainedZonalSubnets(ctx context.Context, constraints *cloudprovider.CapacityConstraints) (map[string][]*ec2.Subnet, error) { // 1. Get all subnets zonalSubnets, err := c.subnetProvider.Get(ctx, c.spec.Cluster.Name) if err != nil { - return nil, err + return nil, fmt.Errorf("getting zonal subnets, %w", err) } - // 2. Return specific subnet if specified. if subnetId, ok := constraints.Topology[cloudprovider.TopologyKeySubnet]; ok { - for _, subnets := range zonalSubnets { + for zone, subnets := range zonalSubnets { for _, subnet := range subnets { if subnetId == *subnet.SubnetId { - return subnet, nil + return map[string][]*ec2.Subnet{zone: {subnet}}, nil } } } return nil, fmt.Errorf("no subnet exists named %s", subnetId) } - - // 3. Return random subnet in the zone. - subnets, ok := zonalSubnets[zone] - if !ok || len(subnets) == 0 { - return nil, fmt.Errorf("no subnets exists for zone %s", zone) + // 3. Constrain by zones + constrainedZones, err := c.getConstrainedZones(ctx, constraints) + if err != nil { + return nil, fmt.Errorf("getting zones, %w", err) + } + constrainedZonalSubnets := map[string][]*ec2.Subnet{} + for zone, subnets := range zonalSubnets { + for _, constrainedZone := range constrainedZones { + if zone == constrainedZone { + constrainedZonalSubnets[constrainedZone] = subnets + } + } + } + if len(constrainedZonalSubnets) == 0 { + return nil, fmt.Errorf("failed to find viable zonal subnet pairing") } - return subnets[rand.Intn(len(subnets))], nil + return constrainedZonalSubnets, nil } -func (c *Capacity) getCapacityType(constraints *cloudprovider.CapacityConstraints) string { - switch constraints.PriorityClass { - case: cloudprovider.PreemptablePriorityClass: - return ec2.DefaultTargetCapacityTypeSpot - case: cloudprovider.GuaranteedPriorityClass: - return ec2.DefaultTargetCapacityTypeOnDemand - default: - return ec2.DefaultTargetCapacityTypeOnDemand +func (c *Capacity) getConstrainedZones(ctx context.Context, constraints *cloudprovider.CapacityConstraints) ([]string, error) { + // 1. Return zone if specified. + if zone, ok := constraints.Topology[cloudprovider.TopologyKeyZone]; ok { + return []string{zone}, nil } + // 2. Return all zone options + zones, err := c.getZones(ctx) + if err != nil { + return nil, err + } + return zones, nil } func (c *Capacity) getZones(ctx context.Context) ([]string, error) { diff --git a/pkg/cloudprovider/aws/fleet/factory.go b/pkg/cloudprovider/aws/fleet/factory.go index e081a37ad496..8630d7a1be9d 100644 --- a/pkg/cloudprovider/aws/fleet/factory.go +++ b/pkg/cloudprovider/aws/fleet/factory.go @@ -26,11 +26,11 @@ import ( const ( // CacheTTL restricts QPS to AWS APIs to this interval for verifying setup resources. - CacheTTL = 5 * time.Minute + CacheTTL = 5 * time.Minute // CacheCleanupInterval triggers cache cleanup (lazy eviction) at this interval. - CacheCleanupInterval = 10 * time.Minute + CacheCleanupInterval = 10 * time.Minute // ClusterTagKeyFormat is set on all Kubernetes owned resources. - ClusterTagKeyFormat = "kubernetes.io/cluster/%s" + ClusterTagKeyFormat = "kubernetes.io/cluster/%s" // KarpenterTagKeyFormat is set on all Karpenter owned resources. KarpenterTagKeyFormat = "karpenter.sh/cluster/%s" ) @@ -39,8 +39,8 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie return &Factory{ ec2: ec2, launchTemplateProvider: &LaunchTemplateProvider{ - launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), ec2: ec2, + launchTemplateCache: cache.New(CacheTTL, CacheCleanupInterval), instanceProfileProvider: &InstanceProfileProvider{ iam: iam, kubeClient: kubeClient, @@ -55,9 +55,8 @@ func NewFactory(ec2 ec2iface.EC2API, iam iamiface.IAMAPI, kubeClient client.Clie ec2: ec2, subnetCache: cache.New(CacheTTL, CacheCleanupInterval), }, - nodeFactory: &NodeFactory{ - ec2: ec2, - }, + nodeFactory: &NodeFactory{ec2: ec2}, + instanceProvider: &InstanceProvider{ec2: ec2}, } } @@ -66,14 +65,15 @@ type Factory struct { launchTemplateProvider *LaunchTemplateProvider nodeFactory *NodeFactory subnetProvider *SubnetProvider + instanceProvider *InstanceProvider } func (f *Factory) For(spec *v1alpha1.ProvisionerSpec) *Capacity { return &Capacity{ spec: spec, - ec2: f.ec2, launchTemplateProvider: f.launchTemplateProvider, nodeFactory: f.nodeFactory, subnetProvider: f.subnetProvider, + instanceProvider: f.instanceProvider, } } diff --git a/pkg/cloudprovider/aws/fleet/instance.go b/pkg/cloudprovider/aws/fleet/instance.go new file mode 100644 index 000000000000..286668e8d81a --- /dev/null +++ b/pkg/cloudprovider/aws/fleet/instance.go @@ -0,0 +1,99 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fleet + +import ( + "context" + "fmt" + "math/rand" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" +) + +type InstanceProvider struct { + ec2 ec2iface.EC2API +} + +// Packing is a binpacking solution of pods and their viable instances types. +type Packing struct { + // Pods is a set of pods that is assigned to this packing. + Pods []*v1.Pod + // InstanceTypes is a set of instance types that is able to fit the pods. + InstanceTypeOptions []string +} + +// GetPackings for the provided pods. Computes an ordered set of viable instance +// types for each packing of pods. Instance variety enables EC2 to make better cost and availability decisions. +func (p *InstanceProvider) GetPackings(ctx context.Context, pods []*v1.Pod, overhead v1.ResourceList) ([]*Packing, error) { + zap.S().Infof("Successfully packed %d pods onto %d nodes", len(pods), 1) + return []*Packing{{ + InstanceTypeOptions: []string{"m5.large"}, // TODO, prioritize possible instance types + Pods: pods, + }}, nil +} + +// Create an instance given the constraints. +func (p *InstanceProvider) Create(ctx context.Context, + launchTemplate *ec2.LaunchTemplate, + instanceTypeOptions []string, + zonalSubnetOptions map[string][]*ec2.Subnet, +) (*string, error) { + // 1. Construct override options. + var overrides []*ec2.FleetLaunchTemplateOverridesRequest + for _, instanceType := range instanceTypeOptions { + for zone, subnets := range zonalSubnetOptions { + overrides = append(overrides, &ec2.FleetLaunchTemplateOverridesRequest{ + AvailabilityZone: aws.String(zone), + InstanceType: aws.String(instanceType), + // FleetAPI cannot span subnets from the same AZ, so randomize. + SubnetId: aws.String(*subnets[rand.Intn(len(subnets))].SubnetId), + }) + } + } + + // 2. Create fleet + createFleetOutput, err := p.ec2.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{ + Type: aws.String(ec2.FleetTypeInstant), + TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ + DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), + TotalTargetCapacity: aws.Int64(1), + }, + LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{ + LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{ + LaunchTemplateName: launchTemplate.LaunchTemplateName, + Version: aws.String("$Default"), + }, + Overrides: overrides, + }}, + }) + if err != nil { + return nil, fmt.Errorf("creating fleet %w", err) + } + // TODO aggregate errors + if count := len(createFleetOutput.Errors); count > 0 { + return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors) + } + if count := len(createFleetOutput.Instances); count != 1 { + return nil, fmt.Errorf("expected 1 instance, but got %d", count) + } + if count := len(createFleetOutput.Instances[0].InstanceIds); count != 1 { + return nil, fmt.Errorf("expected 1 instance ids, but got %d", count) + } + return createFleetOutput.Instances[0].InstanceIds[0], nil +} diff --git a/pkg/cloudprovider/aws/fleet/instance/packer.go b/pkg/cloudprovider/aws/fleet/instance/packer.go deleted file mode 100644 index d19ac5a162ed..000000000000 --- a/pkg/cloudprovider/aws/fleet/instance/packer.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package instance - -import ( - "github.com/aws/aws-sdk-go-v2/service/ec2" - v1 "k8s.io/api/core/v1" -) - -// Return a list of viable instance types - -type Packer struct { - -} - - -func (i *Packer) Pack(pods []*v1.Pod, overhead *v1.ResourceList) []*Packing { - return nil -} - -type Packing struct { - // Pods to be packed onto this instance. - Pods []*v1.Pod - // InstanceTypes that are viable, ordered by price - InstanceTypes []ec2.InstanceType -} - - -// Try to add Pod to Node -// If fits, continue -// If not fits, increase node size (for resource) diff --git a/pkg/cloudprovider/aws/fleet/instance/type.go b/pkg/cloudprovider/aws/fleet/instance/type.go deleted file mode 100644 index 9340d6e252f8..000000000000 --- a/pkg/cloudprovider/aws/fleet/instance/type.go +++ /dev/null @@ -1,5 +0,0 @@ -package instance - -type TypeProvider struct { - -} diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index f4e6f96288f0..1dcb61fe8bdc 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -75,13 +75,11 @@ type CapacityConstraints struct { Pods []*v1.Pod // Overhead resources per node from system resources such a kubelet and // daemonsets. - Overhead *v1.ResourceList + Overhead v1.ResourceList // Topology constrains the topology of the node, e.g. "zone". Topology map[TopologyKey]string - // Architecture constrains the underlying architecture. Default: linux/386 - Architecture *Architecture - // PriorityClass constrains node preemption behavior. Default: Preempable - PriorityClass *PriorityClass + // Architecture constrains the underlying architecture. + Architecture Architecture } // CapacityPacking is a solution to packing pods onto nodes given constraints. @@ -104,14 +102,6 @@ const ( // LinuxAMD64 Architecture = "linux/amd64" TODO ) -// PriorityClass constrains the nodes preemption behavior. -type PriorityClass string - -const ( - GuaranteedPriorityClass PriorityClass = "Guaranteed" - PreemptablePriorityClass PriorityClass = "Preemptable" -) - // Options are injected into cloud providers' factories type Options struct { Client client.Client diff --git a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go index c87f1b567c77..db29f03d8f76 100644 --- a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go +++ b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go @@ -78,11 +78,6 @@ func (a *GreedyAllocator) bind(ctx context.Context, node *v1.Node, pods []*v1.Po return nil } -type SchedulingGroup struct { - Pods []*v1.Pod - Constraints *cloudprovider.CapacityConstraints -} - func (a *GreedyAllocator) getSchedulingGroups(pods []*v1.Pod) []*cloudprovider.CapacityConstraints { schedulingGroups := []*cloudprovider.CapacityConstraints{} for _, pod := range pods { @@ -111,10 +106,8 @@ func constraintsForPod(pod *v1.Pod) *cloudprovider.CapacityConstraints { return &cloudprovider.CapacityConstraints{ Overhead: calculateOverheadResources(), Architecture: getSystemArchitecture(pod), - Topology: map[cloudprovider.TopologyKey]string{ - cloudprovider.TopologyKeyZone: getAvalabiltyZoneForPod(pod), - }, - Pods: []*v1.Pod{pod}, + Topology: map[cloudprovider.TopologyKey]string{}, + Pods: []*v1.Pod{pod}, } } @@ -124,10 +117,5 @@ func calculateOverheadResources() v1.ResourceList { } func getSystemArchitecture(pod *v1.Pod) cloudprovider.Architecture { - return cloudprovider.Linux386 -} - -func getAvalabiltyZoneForPod(pod *v1.Pod) string { - // TODO parse annotation/label from pod - return "us-west-2b" + return cloudprovider.ArchitectureLinux386 }