diff --git a/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go b/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go index 37ce57f6ac..8df7446a3e 100644 --- a/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go +++ b/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go @@ -70,9 +70,13 @@ type eventData struct { } type podStartupLatencyMeasurement struct { - selector *measurementutil.ObjectSelector - isRunning bool - stopCh chan struct{} + selector *measurementutil.ObjectSelector + isPodProcessorRunning bool + podProcessorStopCh chan struct{} + + isEventProcessorRunning bool + eventProcessorStopCh chan struct{} + // This queue can potentially grow indefinitely, beacause we put all changes here. // Usually it's not recommended pattern, but we need it for measuring PodStartupLatency. eventQueue *workqueue.Type @@ -101,7 +105,15 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me if err != nil { return nil, err } - return nil, p.start(config.ClusterFramework.GetClientSets().GetClient()) + err = p.start(config.ClusterFramework.GetClientSets().GetClient()) + if err != nil { + return nil, err + } + schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName) + if err != nil { + return nil, err + } + return nil, p.startEventProcessor(config.ClusterFramework.GetClientSets().GetClient(), schedulerName) case "gather": schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName) if err != nil { @@ -124,14 +136,61 @@ func (p *podStartupLatencyMeasurement) String() string { return podStartupLatencyMeasurementName + ": " + p.selector.String() } +func (p *podStartupLatencyMeasurement) startEventProcessor(c clientset.Interface, schedulerName string) error { + if p.isEventProcessorRunning { + klog.V(2).Infof("%s: pod event processor already running", p) + return nil + } + + klog.V(2).Infof("%s: starting pod event processor...", p) + p.isEventProcessorRunning = true + p.eventProcessorStopCh = make(chan struct{}) + selector := fields.Set{ + "involvedObject.kind": "Pod", + "source": schedulerName, + }.AsSelector().String() + i := informer.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: selector, + }) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), metav1.ListOptions{ + LabelSelector: selector, + }) + }, + }, + func(i1, i2 interface{}) { + if i1 != nil && i2 != nil { + return + } + if i2 == nil { + return + } + event := i1.(*corev1.Event) + key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name) + if _, exists := p.podStartupEntries.Get(key, createPhase); exists { + if !event.EventTime.IsZero() { + p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time) + } else { + p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time) + } + } + }, + ) + return informer.StartAndSync(i, p.eventProcessorStopCh, informerSyncTimeout) +} + func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error { - if p.isRunning { + if p.isPodProcessorRunning { klog.V(2).Infof("%s: pod startup latancy measurement already running", p) return nil } klog.V(2).Infof("%s: starting pod startup latency measurement...", p) - p.isRunning = true - p.stopCh = make(chan struct{}) + p.isPodProcessorRunning = true + p.podProcessorStopCh = make(chan struct{}) i := informer.NewInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -146,7 +205,7 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error { p.addEvent, ) go p.processEvents() - return informer.StartAndSync(i, p.stopCh, informerSyncTimeout) + return informer.StartAndSync(i, p.podProcessorStopCh, informerSyncTimeout) } func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) { @@ -176,9 +235,11 @@ func (p *podStartupLatencyMeasurement) processNextWorkItem() bool { } func (p *podStartupLatencyMeasurement) stop() { - if p.isRunning { - p.isRunning = false - close(p.stopCh) + if p.isPodProcessorRunning { + p.isPodProcessorRunning = false + p.isEventProcessorRunning = false + close(p.podProcessorStopCh) + close(p.eventProcessorStopCh) p.eventQueue.ShutDown() } } @@ -224,15 +285,15 @@ type podStartupLatencyCheck struct { func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier string, schedulerName string) ([]measurement.Summary, error) { klog.V(2).Infof("%s: gathering pod startup latency measurement...", p) - if !p.isRunning { + if !p.isPodProcessorRunning { return nil, fmt.Errorf("metric %s has not been started", podStartupLatencyMeasurementName) } p.stop() - if err := p.gatherScheduleTimes(c, schedulerName); err != nil { - return nil, err - } + // if err := p.gatherScheduleTimes(c, schedulerName); err != nil { + // return nil, err + // } checks := []podStartupLatencyCheck{ {