Skip to content

Commit

Permalink
Create node objects and bind pods to nodes (aws#215)
Browse files Browse the repository at this point in the history
* Create node objects and bind pods to nodes

* fix error check

* fix error check
  • Loading branch information
prateekgogia authored and ellistarn committed Feb 4, 2021
1 parent 232ef1d commit be0ecde
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 14 deletions.
20 changes: 12 additions & 8 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/awslabs/karpenter/pkg/metrics/producers"

"github.com/awslabs/karpenter/pkg/utils/log"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"

clientgoscheme "k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
controllerruntime "sigs.k8s.io/controller-runtime"
controllerruntimezap "sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -55,7 +55,6 @@ func main() {
flag.Parse()

log.Setup(controllerruntimezap.UseDevMode(options.EnableVerboseLogging))

manager := controllers.NewManagerOrDie(controllerruntime.GetConfigOrDie(), controllerruntime.Options{
LeaderElection: true,
LeaderElectionID: "karpenter-leader-election",
Expand All @@ -69,15 +68,20 @@ func main() {
metricsClientFactory := metricsclients.NewFactoryOrDie(options.PrometheusURI)
autoscalerFactory := autoscaler.NewFactoryOrDie(metricsClientFactory, manager.GetRESTMapper(), manager.GetConfig())

if err := manager.Register(
client, err := corev1.NewForConfig(manager.GetConfig())
log.PanicIfError(err, "Failed creating kube client")

err = manager.Register(
&horizontalautoscalerv1alpha1.Controller{AutoscalerFactory: autoscalerFactory},
&scalablenodegroupv1alpha1.Controller{CloudProvider: cloudProviderFactory},
&metricsproducerv1alpha1.Controller{ProducerFactory: metricsProducerFactory},
&provisionerv1alpha1.Controller{
Client: manager.GetClient(),
Allocator: &allocation.GreedyAllocator{CloudProvider: cloudProviderFactory},
Client: manager.GetClient(),
Allocator: &allocation.GreedyAllocator{
CloudProvider: cloudProviderFactory,
CoreV1Client: client,
},
},
).Start(controllerruntime.SetupSignalHandler()); err != nil {
zap.S().Panicf("Unable to start manager, %w", err)
}
).Start(controllerruntime.SetupSignalHandler())
log.PanicIfError(err, "Unable to start manager")
}
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ rules:
- nodes
verbs:
- create
- apiGroups:
- ""
resources:
- pods/binding
verbs:
- create
12 changes: 9 additions & 3 deletions pkg/cloudprovider/aws/fleet/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci
Type: aws.String(ec2.FleetTypeInstant),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO support SPOT
TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently
TotalTargetCapacity: aws.Int64(1), // TODO construct this more intelligently
},
LaunchTemplateConfigs: []*ec2.FleetLaunchTemplateConfigRequest{{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
Expand All @@ -77,9 +77,15 @@ func (c *Capacity) Create(ctx context.Context, constraints *cloudprovider.Capaci
}},
})
if err != nil {
return nil, fmt.Errorf("creating fleet, %w", err)
return nil, fmt.Errorf("creating fleet %w", err)
}
if len(createFleetOutput.Errors) > 0 {
// TODO hande case if createFleetOutput.Instances > 0
return nil, fmt.Errorf("errors while creating fleet, %v", createFleetOutput.Errors)
}
if len(createFleetOutput.Instances) == 0 {
return nil, fmt.Errorf("create fleet returned 0 instances")
}

// 4. Transform to Nodes.
var instanceIds []*string
for _, fleetInstance := range createFleetOutput.Instances {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cloudprovider/aws/fleet/nodefactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (n *NodeFactory) nodeFrom(instance *ec2.Instance) *v1.Node {
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
// TODO, This value is necessary to avoid OutOfPods failure state. Find a way to set this (and cpu/mem) correctly
v1.ResourcePods: resource.MustParse("100"),
v1.ResourcePods: resource.MustParse("100"),
v1.ResourceCPU: resource.MustParse("2"),
v1.ResourceMemory: resource.MustParse("2Gi"),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

var _ Allocator = &GreedyAllocator{}

// GreedyAllocator iteratively assigns pods to scheduling groups and then creates capacity for each group.
type GreedyAllocator struct {
CloudProvider cloudprovider.Factory
CoreV1Client *corev1.CoreV1Client
}

// Allocate takes a list of unschedulable pods and creates nodes based on
Expand All @@ -41,13 +44,80 @@ func (a *GreedyAllocator) Allocate(provisioner *v1alpha1.Provisioner, pods []*v1
zap.S().Infof("Allocating %d pending pods from %d constraint groups", len(pods), len(groups))
// 2. Group pods into equally schedulable constraint group
for _, group := range groups {
if _, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(context.TODO(), group.Constraints); err != nil {
nodes, err := a.CloudProvider.CapacityFor(&provisioner.Spec).Create(context.TODO(), group.Constraints)
// TODO accumulate errors if one request fails.
if err != nil {
return fmt.Errorf("while creating capacity, %w", err)
}
if err := a.createNodesAndAssignPods(nodes, group.Pods); err != nil {
return fmt.Errorf("assigning pods to nodes err: %w", err)
}
}
return nil
}

func (a *GreedyAllocator) createNodesAndAssignPods(nodes []*v1.Node, pods []*v1.Pod) error {

// Currently we are assigning each pod per node.
// Create node object in the cluster
// score nodes and pods from bigger to lower in the list
remainingPods := make([]*v1.Pod, len(pods))
copy(remainingPods, pods)
for _, node := range nodes {
err := a.createNodeObject(node)
if err != nil {
return fmt.Errorf("creating node object %w", err)
}
remainingPods, err = a.assignPodsToNodes(node, remainingPods)
if err != nil {
return fmt.Errorf("update pod spec err: %w", err)
}
}
if len(remainingPods) > 0 {
// this should not happen
return fmt.Errorf("unable to assign %d pods to %d nodes", len(remainingPods), len(nodes))
}
zap.S().Infof("Successfully assigned %d pods to %d node ", len(pods), len(nodes))
return nil
}

func (a *GreedyAllocator) assignPodsToNodes(node *v1.Node, pods []*v1.Pod) ([]*v1.Pod, error) {

remainingPods := make([]*v1.Pod, 0)
for _, pod := range pods {
if !canFitPodOnNode(node, pod) {
remainingPods = append(remainingPods, pod)
continue
}
if err := a.bindPodToNode(node, pod); err != nil {
return nil, fmt.Errorf("binding pod to node failed err %w", err)
}
zap.S().Infof("Pod %s in bind to node %s", pod.Namespace+"/"+pod.Name, node.Name)
}
return remainingPods, nil
}

func (a *GreedyAllocator) bindPodToNode(node *v1.Node, pod *v1.Pod) error {
return a.CoreV1Client.Pods(pod.Namespace).Bind(context.TODO(), &v1.Binding{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
Target: v1.ObjectReference{
Name: node.Name,
},
}, metav1.CreateOptions{})
}

func canFitPodOnNode(node *v1.Node, pod *v1.Pod) bool {
// TODO podResources := calculateResourcesForPod(pod)
return true
}

func (a *GreedyAllocator) createNodeObject(node *v1.Node) error {
_, err := a.CoreV1Client.
Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
return err
}

type SchedulingGroup struct {
Pods []*v1.Pod
Constraints *cloudprovider.CapacityConstraints
Expand Down Expand Up @@ -88,6 +158,7 @@ func schedulingGroupForPod(pod *v1.Pod) *SchedulingGroup {
cloudprovider.TopologyKeyZone: getAvalabiltyZoneForPod(pod),
},
},
Pods: []*v1.Pod{pod},
}
return group
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioner/v1alpha1/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func (c *Controller) Reconcile(object controllers.Object) error {

unschedulable := []*v1.Pod{}
for _, pod := range pods.Items {
tempPod := pod
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonUnschedulable {
unschedulable = append(unschedulable, &pod)
unschedulable = append(unschedulable, &tempPod)
}
}
}
Expand Down

0 comments on commit be0ecde

Please sign in to comment.