From be0ecdea5f8ea8e4e3db7007888f9652e2756cf1 Mon Sep 17 00:00:00 2001 From: Prateek Gogia Date: Tue, 2 Feb 2021 19:07:12 -0600 Subject: [PATCH] Create node objects and bind pods to nodes (#215) * Create node objects and bind pods to nodes * fix error check * fix error check --- cmd/controller/main.go | 20 +++-- config/rbac/role.yaml | 6 ++ pkg/cloudprovider/aws/fleet/capacity.go | 12 ++- pkg/cloudprovider/aws/fleet/nodefactory.go | 4 +- .../v1alpha1/allocation/greedyallocator.go | 73 ++++++++++++++++++- .../provisioner/v1alpha1/controller.go | 3 +- 6 files changed, 104 insertions(+), 14 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 1d010da9a485..5e64be437da0 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -19,10 +19,10 @@ import ( "github.com/awslabs/karpenter/pkg/metrics/producers" "github.com/awslabs/karpenter/pkg/utils/log" - "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" controllerruntime "sigs.k8s.io/controller-runtime" controllerruntimezap "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -55,7 +55,6 @@ func main() { flag.Parse() log.Setup(controllerruntimezap.UseDevMode(options.EnableVerboseLogging)) - manager := controllers.NewManagerOrDie(controllerruntime.GetConfigOrDie(), controllerruntime.Options{ LeaderElection: true, LeaderElectionID: "karpenter-leader-election", @@ -69,15 +68,20 @@ func main() { metricsClientFactory := metricsclients.NewFactoryOrDie(options.PrometheusURI) autoscalerFactory := autoscaler.NewFactoryOrDie(metricsClientFactory, manager.GetRESTMapper(), manager.GetConfig()) - if err := manager.Register( + client, err := corev1.NewForConfig(manager.GetConfig()) + log.PanicIfError(err, "Failed creating kube client") + + err = manager.Register( &horizontalautoscalerv1alpha1.Controller{AutoscalerFactory: autoscalerFactory}, &scalablenodegroupv1alpha1.Controller{CloudProvider: cloudProviderFactory}, &metricsproducerv1alpha1.Controller{ProducerFactory: metricsProducerFactory}, &provisionerv1alpha1.Controller{ - Client: manager.GetClient(), - Allocator: &allocation.GreedyAllocator{CloudProvider: cloudProviderFactory}, + Client: manager.GetClient(), + Allocator: &allocation.GreedyAllocator{ + CloudProvider: cloudProviderFactory, + CoreV1Client: client, + }, }, - ).Start(controllerruntime.SetupSignalHandler()); err != nil { - zap.S().Panicf("Unable to start manager, %w", err) - } + ).Start(controllerruntime.SetupSignalHandler()) + log.PanicIfError(err, "Unable to start manager") } diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 87c51cbcbad5..a7e71b5ac657 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -78,3 +78,9 @@ rules: - nodes verbs: - create +- apiGroups: + - "" + resources: + - pods/binding + verbs: + - create diff --git a/pkg/cloudprovider/aws/fleet/capacity.go b/pkg/cloudprovider/aws/fleet/capacity.go index 3989505741c5..42e215c7858f 100644 --- a/pkg/cloudprovider/aws/fleet/capacity.go +++ b/pkg/cloudprovider/aws/fleet/capacity.go @@ -62,7 +62,7 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci Type: aws.String(ec2.FleetTypeInstant), TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO support SPOT - TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently + TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently }, LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{ LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{ @@ -77,9 +77,15 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci }}, }) if err != nil { - return nil, fmt.Errorf("creating fleet, %w", err) + return nil, fmt.Errorf("creating fleet %w", err) + } + if len(createFleetOutput.Errors) > 0 { + // TODO hande case if createFleetOutput.Instances > 0 + return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors) + } + if len(createFleetOutput.Instances) == 0 { + return nil, fmt.Errorf("create fleet returned 0 instances") } - // 4. Transform to Nodes. var instanceIds []*string for _, fleetInstance := range createFleetOutput.Instances { diff --git a/pkg/cloudprovider/aws/fleet/nodefactory.go b/pkg/cloudprovider/aws/fleet/nodefactory.go index 723761b37210..32671c2f5259 100644 --- a/pkg/cloudprovider/aws/fleet/nodefactory.go +++ b/pkg/cloudprovider/aws/fleet/nodefactory.go @@ -74,7 +74,9 @@ func (n *NodeFactory) nodeFrom(instance *ec2.Instance) *v1.Node { Status: v1.NodeStatus{ Allocatable: v1.ResourceList{ // TODO, This value is necessary to avoid OutOfPods failure state. Find a way to set this (and cpu/mem) correctly - v1.ResourcePods: resource.MustParse("100"), + v1.ResourcePods: resource.MustParse("100"), + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("2Gi"), }, }, } diff --git a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go index 61dccf42a8ac..c6e25640cf96 100644 --- a/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go +++ b/pkg/controllers/provisioner/v1alpha1/allocation/greedyallocator.go @@ -23,6 +23,8 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) var _ Allocator = &GreedyAllocator{} @@ -30,6 +32,7 @@ var _ Allocator = &GreedyAllocator{} // GreedyAllocator iteratively assigns pods to scheduling groups and then creates capacity for each group. type GreedyAllocator struct { CloudProvider cloudprovider.Factory + CoreV1Client *corev1.CoreV1Client } // Allocate takes a list of unschedulable pods and creates nodes based on @@ -41,13 +44,80 @@ func (a *GreedyAllocator) Allocate(provisioner *v1alpha1.Provisioner, pods []*v1 zap.S().Infof("Allocating %d pending pods from %d constraint groups", len(pods), len(groups)) // 2. Group pods into equally schedulable constraint group for _, group := range groups { - if _, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(context.TODO(), group.Constraints); err != nil { + nodes, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(context.TODO(), group.Constraints) + // TODO accumulate errors if one request fails. + if err != nil { return fmt.Errorf("while creating capacity, %w", err) } + if err := a.createNodesAndAssignPods(nodes, group.Pods); err != nil { + return fmt.Errorf("assigning pods to nodes err: %w", err) + } + } + return nil +} + +func (a *GreedyAllocator) createNodesAndAssignPods(nodes []*v1.Node, pods []*v1.Pod) error { + + // Currently we are assigning each pod per node. + // Create node object in the cluster + // score nodes and pods from bigger to lower in the list + remainingPods := make([]*v1.Pod, len(pods)) + copy(remainingPods, pods) + for _, node := range nodes { + err := a.createNodeObject(node) + if err != nil { + return fmt.Errorf("creating node object %w", err) + } + remainingPods, err = a.assignPodsToNodes(node, remainingPods) + if err != nil { + return fmt.Errorf("update pod spec err: %w", err) + } + } + if len(remainingPods) > 0 { + // this should not happen + return fmt.Errorf("unable to assign %d pods to %d nodes", len(remainingPods), len(nodes)) } + zap.S().Infof("Successfully assigned %d pods to %d node ", len(pods), len(nodes)) return nil } +func (a *GreedyAllocator) assignPodsToNodes(node *v1.Node, pods []*v1.Pod) ([]*v1.Pod, error) { + + remainingPods := make([]*v1.Pod, 0) + for _, pod := range pods { + if !canFitPodOnNode(node, pod) { + remainingPods = append(remainingPods, pod) + continue + } + if err := a.bindPodToNode(node, pod); err != nil { + return nil, fmt.Errorf("binding pod to node failed err %w", err) + } + zap.S().Infof("Pod %s in bind to node %s", pod.Namespace+"/"+pod.Name, node.Name) + } + return remainingPods, nil +} + +func (a *GreedyAllocator) bindPodToNode(node *v1.Node, pod *v1.Pod) error { + return a.CoreV1Client.Pods(pod.Namespace).Bind(context.TODO(), &v1.Binding{ + TypeMeta: pod.TypeMeta, + ObjectMeta: pod.ObjectMeta, + Target: v1.ObjectReference{ + Name: node.Name, + }, + }, metav1.CreateOptions{}) +} + +func canFitPodOnNode(node *v1.Node, pod *v1.Pod) bool { + // TODO podResources := calculateResourcesForPod(pod) + return true +} + +func (a *GreedyAllocator) createNodeObject(node *v1.Node) error { + _, err := a.CoreV1Client. + Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + return err +} + type SchedulingGroup struct { Pods []*v1.Pod Constraints *cloudprovider.CapacityConstraints @@ -88,6 +158,7 @@ func schedulingGroupForPod(pod *v1.Pod) *SchedulingGroup { cloudprovider.TopologyKeyZone: getAvalabiltyZoneForPod(pod), }, }, + Pods: []*v1.Pod{pod}, } return group } diff --git a/pkg/controllers/provisioner/v1alpha1/controller.go b/pkg/controllers/provisioner/v1alpha1/controller.go index 6e688563239d..455325fd5fa3 100644 --- a/pkg/controllers/provisioner/v1alpha1/controller.go +++ b/pkg/controllers/provisioner/v1alpha1/controller.go @@ -61,9 +61,10 @@ func (c *Controller) Reconcile(object controllers.Object) error { unschedulable := []*v1.Pod{} for _, pod := range pods.Items { + tempPod := pod for _, condition := range pod.Status.Conditions { if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonUnschedulable { - unschedulable = append(unschedulable, &pod) + unschedulable = append(unschedulable, &tempPod) } } }