Skip to content

Commit

Permalink
Add runid label selector to informer (kube-burner#514)
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 authored Nov 29, 2023
1 parent 142e646 commit 62cdb5a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
4 changes: 2 additions & 2 deletions pkg/measurements/metrics/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type Watcher struct {
}

// NewWatcher return a new ListWatcher of the specified resource and namespace
func NewWatcher(restClient *rest.RESTClient, name string, resource string, namespace string) *Watcher {
func NewWatcher(restClient *rest.RESTClient, name string, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *Watcher {
lw := cache.NewFilteredListWatchFromClient(
restClient,
resource,
namespace,
func(options *metav1.ListOptions) {},
optionsModifier,
)
return &Watcher{
name: name,
Expand Down
44 changes: 19 additions & 25 deletions pkg/measurements/pod_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,15 @@ func (p *podLatency) handleCreatePod(obj interface{}) {
p.metricLock.Lock()
defer p.metricLock.Unlock()
if _, exists := p.metrics[string(pod.UID)]; !exists {
timestamp, isValid := validatePod(factory.jobConfig.JobType, pod)
if isValid {
p.metrics[string(pod.UID)] = podMetric{
Timestamp: timestamp,
Namespace: pod.Namespace,
Name: pod.Name,
MetricName: podLatencyMeasurement,
UUID: globalCfg.UUID,
JobConfig: *factory.jobConfig,
JobName: factory.jobConfig.Name,
Metadata: factory.metadata,
}
p.metrics[string(pod.UID)] = podMetric{
Timestamp: pod.CreationTimestamp.Time.UTC(),
Namespace: pod.Namespace,
Name: pod.Name,
MetricName: podLatencyMeasurement,
UUID: globalCfg.UUID,
JobConfig: *factory.jobConfig,
JobName: factory.jobConfig.Name,
Metadata: factory.metadata,
}
}
}
Expand Down Expand Up @@ -138,13 +135,20 @@ func (p *podLatency) setConfig(cfg types.Measurement) error {
// start starts podLatency measurement
func (p *podLatency) start(measurementWg *sync.WaitGroup) {
defer measurementWg.Done()
if factory.jobConfig.JobType == config.DeletionJob {
log.Info("Pod latency measurement not compatible with delete jobs, skipping")
return
}
p.metrics = make(map[string]podMetric)
log.Infof("Creating Pod latency watcher for %s", factory.jobConfig.Name)
p.watcher = metrics.NewWatcher(
factory.clientSet.CoreV1().RESTClient().(*rest.RESTClient),
"podWatcher",
"pods",
corev1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID)
},
)
p.watcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.handleCreatePod,
Expand Down Expand Up @@ -208,6 +212,9 @@ func (p *podLatency) collect(measurementWg *sync.WaitGroup) {

// Stop stops podLatency measurement
func (p *podLatency) stop() error {
if factory.jobConfig.JobType == config.DeletionJob {
return nil
}
var err error
if p.watcher != nil {
p.watcher.StopWatcher()
Expand Down Expand Up @@ -377,16 +384,3 @@ func (p *podLatency) validateConfig() error {
}
return nil
}

// validatePod validates a pod based on job type and returns its timestamp for latency calculation.
// It returns a timestamp and a boolean value indicating validation details.
func validatePod(jobType config.JobType, pod *corev1.Pod) (time.Time, bool) {
if jobType == config.CreationJob {
runid, exists := pod.Labels["kube-burner-runid"]
if exists && runid == globalCfg.RUNID {
return pod.CreationTimestamp.Time.UTC(), true
}
return pod.CreationTimestamp.Time.UTC(), false
}
return pod.Status.StartTime.Time.UTC(), true
}
17 changes: 17 additions & 0 deletions pkg/measurements/vmi_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cloud-bulldozer/kube-burner/pkg/measurements/types"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -286,6 +287,10 @@ func (p *vmiLatency) setConfig(cfg types.Measurement) error {

// Start starts vmiLatency measurement
func (p *vmiLatency) start(measurementWg *sync.WaitGroup) {
if factory.jobConfig.JobType == config.DeletionJob {
log.Info("VMI latency measurement not compatible with delete jobs, skipping")
return
}
defer measurementWg.Done()
p.metrics = make(map[string]*vmiMetric)

Expand All @@ -296,6 +301,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) {
"vmWatcher",
"virtualmachines",
corev1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID)
},
)
p.vmWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.handleCreateVM,
Expand All @@ -313,6 +321,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) {
"vmiWatcher",
"virtualmachineinstances",
corev1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID)
},
)
p.vmiWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.handleCreateVMI,
Expand All @@ -330,6 +341,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) {
"podWatcher",
"pods",
corev1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID)
},
)
p.vmiPodWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.handleCreateVMIPod,
Expand Down Expand Up @@ -371,6 +385,9 @@ func (p *vmiLatency) collect(measurementWg *sync.WaitGroup) {

// Stop stops vmiLatency measurement
func (p *vmiLatency) stop() error {
if factory.jobConfig.JobType == config.DeletionJob {
return nil
}
p.vmWatcher.StopWatcher()
p.vmiWatcher.StopWatcher()
p.vmiPodWatcher.StopWatcher()
Expand Down

0 comments on commit 62cdb5a

Please sign in to comment.