Skip to content

Commit

Permalink
Test: Fixed a few bugs with running integration tests in the integrat…
Browse files Browse the repository at this point in the history
…ion environment (aws#2184)

* Test: Add explicit tolerations in test setup since snapshot releases do not use latest yaml (yet)

* Restored clustername discovery

* PR Comments
  • Loading branch information
ellistarn committed Jul 26, 2022
1 parent 4fad429 commit e2fa6f7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 33 deletions.
57 changes: 25 additions & 32 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"time"

"github.com/avast/retry-go"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
Expand All @@ -36,9 +35,9 @@ import (
"github.com/aws/karpenter/pkg/utils/resources"
)

// ClusterSyncRetries controls how many times we attempt to retry waiting on cluster state sync. This is exposed for
// WaitForClusterSync controls whether or not we synchronize before scheduling. This is exposed for
// unit testing purposes so we can avoid a lengthy delay in cluster sync.
var ClusterSyncRetries uint = 5
var WaitForClusterSync = true

func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
for provisioner := range instanceTypes {
Expand Down Expand Up @@ -70,7 +69,9 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates [
}

// wait to ensure that our cluster state is synced with the current known nodes to prevent over-shooting
s.waitForClusterStateSync(ctx)
for err := s.clusterStateSynchronized(ctx); err != nil; time.Sleep(1 * time.Second) {
logging.FromContext(ctx).Info("waiting for cluster state to catch up, %s", err)
}

// create our in-flight nodes
s.cluster.ForEachNode(func(node *state.Node) bool {
Expand Down Expand Up @@ -234,38 +235,30 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
return errs
}

// waitForClusterStateSync ensures that our cluster state is aware of at least all of the nodes that our list cache has.
// clusterStateSynchronized ensures that our cluster state is aware of at least all of the nodes that our list cache has.
// Since we launch nodes in parallel, we can create many node objects which may not all be reconciled by the cluster
// state before we start trying to schedule again. In this case, we would over-provision as we weren't aware of the
// inflight nodes.
func (s *Scheduler) waitForClusterStateSync(ctx context.Context) {
if err := retry.Do(func() error {
// collect the nodes known by the kube API server
var nodes v1.NodeList
if err := s.kubeClient.List(ctx, &nodes); err != nil {
return nil
}
unknownNodes := sets.NewString()
for _, n := range nodes.Items {
unknownNodes.Insert(n.Name)
}

// delete any that cluster state already knows about
s.cluster.ForEachNode(func(n *state.Node) bool {
delete(unknownNodes, n.Node.Name)
return true
})

// and we're left with nodes which exist, but haven't reconciled with cluster state yet
if len(unknownNodes) != 0 {
return fmt.Errorf("%d nodes not known to cluster state", len(unknownNodes))
}
return nil
}, retry.Delay(1*time.Second),
retry.Attempts(ClusterSyncRetries),
); err != nil {
logging.FromContext(ctx).Infof("nodes failed to sync, may launch too many nodes which should resolve")
func (s *Scheduler) clusterStateSynchronized(ctx context.Context) error {
// collect the nodes known by the kube API server
var nodes v1.NodeList
if err := s.kubeClient.List(ctx, &nodes); err != nil {
return err
}
unknownNodes := sets.NewString()
for _, n := range nodes.Items {
unknownNodes.Insert(n.Name)
}
// delete any that cluster state already knows about
s.cluster.ForEachNode(func(n *state.Node) bool {
delete(unknownNodes, n.Node.Name)
return true
})
// and we're left with nodes which exist, but haven't reconciled with cluster state yet
if len(unknownNodes) != 0 {
return fmt.Errorf("%d nodes not known to cluster state", len(unknownNodes))
}
return nil
}

// subtractMax returns the remaining resources after subtracting the max resource quantity per instance type. To avoid
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
scheduling.ClusterSyncRetries = 0
scheduling.WaitForClusterSync = false
env = test.NewEnvironment(ctx, func(e *test.Environment) {
cloudProv = &fake.CloudProvider{}
cfg = test.NewConfig()
Expand Down

0 comments on commit e2fa6f7

Please sign in to comment.