Skip to content

Commit

Permalink
Update scheduler initialization in predicates.go
Browse files Browse the repository at this point in the history
With previous code and recent changes to scheduler codebase we did not
initialize informer hooks which updated scheduler cache used by some of
the predicate function (especially MatchInterPodAffinity).

This change replicates how scheduler is initialized now - I.e now we
actually create Scheduler object now and do explicity call AddAllEventHandlers.

As a followup a discussion will be started with sig-scheduling to make
this initialization in CA simpler and with less copy-pasting of code
from scheduler codebase.
  • Loading branch information
losipiuk committed Mar 14, 2019
1 parent 26f45ff commit 5059184
Showing 1 changed file with 75 additions and 24 deletions.
99 changes: 75 additions & 24 deletions cluster-autoscaler/simulator/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"strings"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
informers "k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

Expand Down Expand Up @@ -64,36 +68,79 @@ func init() {
algorithmprovider.ApplyFeatureGates()
}

// NoOpEventRecorder is a noop implementation of EventRecorder
type NoOpEventRecorder struct{}

// Event is a noop method implementation
func (NoOpEventRecorder) Event(object runtime.Object, eventtype, reason, message string) {
}

// Eventf is a noop method implementation
func (NoOpEventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}

// PastEventf is a noop method implementation
func (NoOpEventRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}

// AnnotatedEventf is a noop method implementation
func (NoOpEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
}

// NewPredicateChecker builds PredicateChecker.
func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*PredicateChecker, error) {
provider, err := factory.GetAlgorithmProvider(factory.DefaultProvider)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: "cluster-autoscaler",
algorithmProvider := factory.DefaultProvider

// Set up the configurator which can create schedulers from configs.
nodeInformer := informerFactory.Core().V1().Nodes()
podInformer := informerFactory.Core().V1().Pods()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
replicationControllerInformer := informerFactory.Core().V1().ReplicationControllers()
replicaSetInformer := informerFactory.Apps().V1().ReplicaSets()
statefulSetInformer := informerFactory.Apps().V1().StatefulSets()
serviceInformer := informerFactory.Core().V1().Services()
pdbInformer := informerFactory.Policy().V1beta1().PodDisruptionBudgets()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: apiv1.DefaultSchedulerName,
Client: kubeClient,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: apiv1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: scheduler.BindTimeoutSeconds,
})
informerFactory.Start(stop)

metadataProducer, err := schedulerConfigFactory.GetPredicateMetadataProducer()
// Create the config from a named algorithm provider.
config, err := configurator.CreateFromProvider(algorithmProvider)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", algorithmProvider, err)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = NoOpEventRecorder{}
config.DisablePreemption = false
config.StopEverything = stop

// Create the scheduler.
sched := scheduler.NewFromConfig(config)

scheduler.AddAllEventHandlers(sched, apiv1.DefaultSchedulerName,
nodeInformer, podInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, pdbInformer, storageClassInformer)

predicateMap := map[string]predicates.FitPredicate{}
for predicateName, predicateFunc := range sched.Config().Algorithm.Predicates() {
predicateMap[predicateName] = predicateFunc
}
predicateMap, err := schedulerConfigFactory.GetPredicates(provider.FitPredicateKeys)
predicateMap["ready"] = isNodeReadyAndSchedulablePredicate
if err != nil {
return nil, err
Expand Down Expand Up @@ -121,8 +168,12 @@ func NewPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{})
klog.V(1).Infof("Using predicate %s", predInfo.name)
}

// TODO: Verify that run is not needed anymore.
// schedulerConfigFactory.Run()
informerFactory.Start(stop)

metadataProducer, err := configurator.GetPredicateMetadataProducer()
if err != nil {
return nil, fmt.Errorf("could not obtain predicateMetadataProducer; %v", err.Error())
}

return &PredicateChecker{
predicates: predicateList,
Expand Down

0 comments on commit 5059184

Please sign in to comment.