Skip to content

Commit

Permalink
Fix: Remove spurious logging from tests and indefinitely rely on clus…
Browse files Browse the repository at this point in the history
…ter state sync (#2188)

* Test: Add explicit tolerations in test setup since snapshot releases do not use latest yaml (yet)

* Restored clustername discovery

* PR Comments
  • Loading branch information
ellistarn authored Jul 27, 2022
1 parent 60419cf commit 6ea12fb
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 41 deletions.
49 changes: 10 additions & 39 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"sort"
"time"

"github.com/avast/retry-go"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -36,9 +34,9 @@ import (
"github.com/aws/karpenter/pkg/utils/resources"
)

// ClusterSyncRetries controls how many times we attempt to retry waiting on cluster state sync. This is exposed for
// WaitForClusterSync controls whether or not we synchronize before scheduling. This is exposed for
// unit testing purposes so we can avoid a lengthy delay in cluster sync.
var ClusterSyncRetries uint = 5
var WaitForClusterSync = true

func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
for provisioner := range instanceTypes {
Expand Down Expand Up @@ -70,7 +68,14 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates [
}

// wait to ensure that our cluster state is synced with the current known nodes to prevent over-shooting
s.waitForClusterStateSync(ctx)
for WaitForClusterSync {
if err := s.cluster.Synchronized(ctx); err != nil {
logging.FromContext(ctx).Infof("waiting for cluster state to catch up, %s", err)
time.Sleep(1 * time.Second)
} else {
break
}
}

// create our in-flight nodes
s.cluster.ForEachNode(func(node *state.Node) bool {
Expand Down Expand Up @@ -234,40 +239,6 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
return errs
}

// waitForClusterStateSync ensures that our cluster state is aware of at least all of the nodes that our list cache has.
// Since we launch nodes in parallel, we can create many node objects which may not all be reconciled by the cluster
// state before we start trying to schedule again. In this case, we would over-provision as we weren't aware of the
// inflight nodes.
func (s *Scheduler) waitForClusterStateSync(ctx context.Context) {
if err := retry.Do(func() error {
// collect the nodes known by the kube API server
var nodes v1.NodeList
if err := s.kubeClient.List(ctx, &nodes); err != nil {
return nil
}
unknownNodes := sets.NewString()
for _, n := range nodes.Items {
unknownNodes.Insert(n.Name)
}

// delete any that cluster state already knows about
s.cluster.ForEachNode(func(n *state.Node) bool {
delete(unknownNodes, n.Node.Name)
return true
})

// and we're left with nodes which exist, but haven't reconciled with cluster state yet
if len(unknownNodes) != 0 {
return fmt.Errorf("%d nodes not known to cluster state", len(unknownNodes))
}
return nil
}, retry.Delay(1*time.Second),
retry.Attempts(ClusterSyncRetries),
); err != nil {
logging.FromContext(ctx).Infof("nodes failed to sync, may launch too many nodes which should resolve")
}
}

// subtractMax returns the remaining resources after subtracting the max resource quantity per instance type. To avoid
// overshooting out, we need to pessimistically assume that if e.g. we request a 2, 4 or 8 CPU instance type
// that the 8 CPU instance type is all that will be available. This could cause a batch of pods to take multiple rounds
Expand Down
1 change: 0 additions & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
scheduling.ClusterSyncRetries = 0
env = test.NewEnvironment(ctx, func(e *test.Environment) {
cloudProv = &fake.CloudProvider{}
cfg = test.NewConfig()
Expand Down
27 changes: 27 additions & 0 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
Expand Down Expand Up @@ -452,3 +453,29 @@ func (c *Cluster) populateInstanceType(ctx context.Context, node *v1.Node, n *No
}
return fmt.Errorf("instance type '%s' not found", instanceTypeName)
}

// clusterStateSynchronized ensures that our cluster state is aware of at least all of the nodes that our list cache has.
// Since we launch nodes in parallel, we can create many node objects which may not all be reconciled by the cluster
// state before we start trying to schedule again. In this case, we would over-provision as we weren't aware of the
// inflight nodes.
func (c *Cluster) Synchronized(ctx context.Context) error {
// collect the nodes known by the kube API server
var nodes v1.NodeList
if err := c.kubeClient.List(ctx, &nodes); err != nil {
return err
}
unknownNodes := sets.NewString()
for _, n := range nodes.Items {
unknownNodes.Insert(n.Name)
}
// delete any that cluster state already knows about
c.ForEachNode(func(n *Node) bool {
unknownNodes.Delete(n.Node.Name)
return true
})
// and we're left with nodes which exist, but haven't reconciled with cluster state yet
if len(unknownNodes) != 0 {
return fmt.Errorf("%d/%d nodes not yet synchronized", unknownNodes.Len(), len(nodes.Items))
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"

"github.com/aws/karpenter/pkg/apis"
"github.com/aws/karpenter/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter/pkg/utils/project"
)

Expand Down Expand Up @@ -71,6 +72,8 @@ type Environment struct {
type EnvironmentOption func(env *Environment)

func NewEnvironment(ctx context.Context, options ...EnvironmentOption) *Environment {
scheduling.WaitForClusterSync = false

ctx, stop := context.WithCancel(ctx)
return &Environment{
Environment: envtest.Environment{
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Provisioner(overrides ...ProvisionerOptions) *v1alpha5.Provisioner {
options.Name = RandomName()
}
if options.Limits == nil {
options.Limits = v1.ResourceList{v1.ResourceCPU: resource.MustParse("1000")}
options.Limits = v1.ResourceList{v1.ResourceCPU: resource.MustParse("2000")}
}

provisioner := &v1alpha5.Provisioner{
Expand Down

0 comments on commit 6ea12fb

Please sign in to comment.