Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure cluster state is synced prior to scheduling #2182

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/avast/retry-go"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -33,6 +36,10 @@ import (
"github.com/aws/karpenter/pkg/utils/resources"
)

// ClusterSyncRetries controls how many times we attempt to retry waiting on cluster state sync. This is exposed for
// unit testing purposes so we can avoid a lengthy delay in cluster sync.
var ClusterSyncRetries uint = 5

func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
for provisioner := range instanceTypes {
sort.Slice(instanceTypes[provisioner], func(i, j int) bool {
Expand Down Expand Up @@ -62,6 +69,9 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates [
}
}

// wait to ensure that our cluster state is synced with the current known nodes to prevent over-shooting
s.waitForClusterStateSync(ctx)

// create our in-flight nodes
s.cluster.ForEachNode(func(node *state.Node) bool {
name, ok := node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]
Expand Down Expand Up @@ -224,6 +234,40 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
return errs
}

// waitForClusterStateSync ensures that our cluster state is aware of at least all of the nodes that our list cache has.
// Since we launch nodes in parallel, we can create many node objects which may not all be reconciled by the cluster
// state before we start trying to schedule again. In this case, we would over-provision as we weren't aware of the
// inflight nodes.
func (s *Scheduler) waitForClusterStateSync(ctx context.Context) {
if err := retry.Do(func() error {
// collect the nodes known by the kube API server
var nodes v1.NodeList
if err := s.kubeClient.List(ctx, &nodes); err != nil {
return nil
}
unknownNodes := sets.NewString()
for _, n := range nodes.Items {
unknownNodes.Insert(n.Name)
}

// delete any that cluster state already knows about
s.cluster.ForEachNode(func(n *state.Node) bool {
delete(unknownNodes, n.Node.Name)
return true
})

// and we're left with nodes which exist, but haven't reconciled with cluster state yet
if len(unknownNodes) != 0 {
return fmt.Errorf("%d nodes not known to cluster state", len(unknownNodes))
}
return nil
}, retry.Delay(1*time.Second),
retry.Attempts(ClusterSyncRetries),
); err != nil {
logging.FromContext(ctx).Infof("nodes failed to sync, may launch too many nodes which should resolve")
}
}

// subtractMax returns the remaining resources after subtracting the max resource quantity per instance type. To avoid
// overshooting out, we need to pessimistically assume that if e.g. we request a 2, 4 or 8 CPU instance type
// that the 8 CPU instance type is all that will be available. This could cause a batch of pods to take multiple rounds
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
scheduling.ClusterSyncRetries = 0
env = test.NewEnvironment(ctx, func(e *test.Environment) {
cloudProv = &fake.CloudProvider{}
cfg = test.NewConfig()
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,11 @@ func (c *Cluster) deleteNode(nodeName string) {

// updateNode is called for every node reconciliation
func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error {
// perform node lookup before we lock so that the slower operation can occur in parallel
n, err := c.newNode(ctx, node)

c.mu.Lock()
defer c.mu.Unlock()
n, err := c.newNode(ctx, node)
if err != nil {
// ensure that the out of date node is forgotten
delete(c.nodes, node.Name)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/state/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -65,5 +66,6 @@ func (c *NodeController) Register(ctx context.Context, m manager.Manager) error
NewControllerManagedBy(m).
Named(nodeControllerName).
For(&v1.Node{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Complete(c)
}
2 changes: 2 additions & 0 deletions pkg/controllers/state/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -66,6 +67,7 @@ func (c *PodController) Register(ctx context.Context, m manager.Manager) error {
return controllerruntime.
NewControllerManagedBy(m).
Named(podControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
For(&v1.Pod{}).
Complete(c)
}