Skip to content

Commit

Permalink
Adding churn metrics (kube-burner#599)
Browse files Browse the repository at this point in the history
## Type of change

- [x] Refactor
- [x] New feature
- [ ] Bug fix
- [ ] Optimization
- [ ] Documentation Update

## Description
Adding a flag to filter out churn metrics among the metrics collected
from our metrics and alert profiles.

## Related Tickets & Documents
- Related Issue # kube-burner#594
- Closes # https://issues.redhat.com/browse/PERFSCALE-2840

## Testing
Tested in local using the below command and observed its behavior before
and after churn.
```
kube-burner init -c kube-burner.yml --uuid="${UUID}" --log-level=debug -m metrics-profile.yaml -u $PROM_URL -t $PROM_TOKEN -a alert-profile.yaml
```
If churn is enabled, we will see a `churnMetric: true` flag in relevant
metrics or else we see it. Keeping it this way to keep our docs content
optimal.
fields in jobSummary
```
    "churnStartTimestamp": "2024-02-24T23:42:12.259757696Z",
    "churnEndTimestamp": "2024-02-24T23:44:04.688532369Z",
```
fields in metrics
```
  "query": "histogram_quantile(0.99, sum(rate(ovnkube_master_network_programming_duration_seconds_bucket{kind=\"service\"}[2m])) by (le))",
  "churnMetric": true,
  "metricName": "serviceSyncLatency",
```

---------

Signed-off-by: Vishnu Challa <[email protected]>
Co-authored-by: Vishnu Challa <[email protected]>
  • Loading branch information
vishnuchalla and Vishnu Challa authored Mar 17, 2024
1 parent 1aba04c commit f51585d
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 44 deletions.
4 changes: 2 additions & 2 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func importCmd() *cobra.Command {
if err != nil {
log.Fatal(err.Error())
}
err = metrics.ImportTarball(tarball, indexer, indexerConfig.MetricsDirectory)
err = metrics.ImportTarball(tarball, indexer)
if err != nil {
log.Fatal(err.Error())
}
Expand Down Expand Up @@ -479,7 +479,7 @@ func alertCmd() *cobra.Command {
if alertM, err = alerting.NewAlertManager(alertProfile, uuid, p, false, *indexer); err != nil {
log.Fatalf("Error creating alert manager: %s", err)
}
err = alertM.Evaluate(startTime, endTime)
err = alertM.Evaluate(startTime, endTime, nil, nil)
log.Info("👋 Exiting kube-burner ", uuid)
if err != nil {
os.Exit(1)
Expand Down
15 changes: 10 additions & 5 deletions pkg/alerting/alert_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type alert struct {
Severity severityLevel `json:"severity"`
Description string `json:"description"`
MetricName string `json:"metricName"`
ChurnMetric bool `json:"churnMetric,omitempty"`
}

// AlertManager configuration
Expand Down Expand Up @@ -114,7 +115,7 @@ func (a *AlertManager) readProfile(alertProfileCfg string, embedConfig bool) err
}

// Evaluate evaluates expressions
func (a *AlertManager) Evaluate(start, end time.Time) error {
func (a *AlertManager) Evaluate(start, end time.Time, churnStart, churnEnd *time.Time) error {
errs := []error{}
log.Infof("Evaluating alerts for prometheus: %v", a.prometheus.Endpoint)
var alertList []interface{}
Expand All @@ -133,7 +134,7 @@ func (a *AlertManager) Evaluate(start, end time.Time) error {
log.Warnf("Error performing query %s: %s", expr, err)
continue
}
alertData, err := parseMatrix(v, alert.Description, alert.Severity)
alertData, err := parseMatrix(v, alert.Description, alert.Severity, churnStart, churnEnd)
if err != nil {
log.Error(err.Error())
errs = append(errs, err)
Expand All @@ -158,7 +159,7 @@ func (a *AlertManager) validateTemplates() error {
return nil
}

func parseMatrix(value model.Value, description string, severity severityLevel) ([]alert, error) {
func parseMatrix(value model.Value, description string, severity severityLevel, churnStart, churnEnd *time.Time) ([]alert, error) {
var renderedDesc bytes.Buffer
var templateData descriptionTemplate
// The same query can fire multiple alerts, so we have to return an array of them
Expand All @@ -184,12 +185,16 @@ func parseMatrix(value model.Value, description string, severity severityLevel)
errs = append(errs, err)
}
msg := fmt.Sprintf("alert at %v: '%s'", val.Timestamp.Time().UTC().Format(time.RFC3339), renderedDesc.String())
alertSet = append(alertSet, alert{
alert := alert{
Timestamp: val.Timestamp.Time().UTC(),
Severity: severity,
Description: renderedDesc.String(),
MetricName: alertMetricName,
})
}
if churnStart != nil && alert.Timestamp.After(*churnStart) && alert.Timestamp.Before(*churnEnd) {
alert.ChurnMetric = true
}
alertSet = append(alertSet, alert)
switch severity {
case sevWarn:
log.Warnf("🚨 %s", msg)
Expand Down
14 changes: 13 additions & 1 deletion pkg/burner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
log.Infof("🔥 Starting kube-burner (%s@%s) with UUID %s", version.Version, version.GitCommit, uuid)
go func() {
var innerRC int
var churnStart *time.Time
var churnEnd *time.Time
measurements.NewMeasurementFactory(configSpec, metricsScraper.Metadata)
jobList = newExecutorList(configSpec, kubeClientProvider, uuid, timeout)
ClientSet, restConfig = kubeClientProvider.DefaultClientSet()
Expand Down Expand Up @@ -156,7 +158,9 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
log.Error(err.Error())
}
if job.Churn {
currentJob.ChurnStart = time.Now().UTC()
job.RunCreateJobWithChurn()
currentJob.ChurnEnd = time.Now().UTC()
}
globalWaitMap[strconv.Itoa(jobPosition)+job.Name] = waitListNamespaces
executorMap[strconv.Itoa(jobPosition)+job.Name] = job
Expand Down Expand Up @@ -245,12 +249,20 @@ func Run(configSpec config.Spec, kubeClientProvider *config.KubeClientProvider,
}
if !job.JobConfig.SkipIndexing {
for _, indexer := range metricsScraper.IndexerList {
if job.JobConfig.Churn {
jobTimings.ChurnStartTimestamp = &job.ChurnStart
jobTimings.ChurnEndTimestamp = &job.ChurnEnd
if churnStart == nil {
churnStart = &job.ChurnStart
}
churnEnd = &job.ChurnEnd
}
indexjobSummaryInfo(indexer, uuid, jobTimings, job.JobConfig, metricsScraper.Metadata)
}
}
}
for _, alertM := range metricsScraper.AlertMs {
if err := alertM.Evaluate(executedJobs[0].Start, executedJobs[len(jobList)-1].End); err != nil {
if err := alertM.Evaluate(executedJobs[0].Start, executedJobs[len(jobList)-1].End, churnStart, churnEnd); err != nil {
errs = append(errs, err)
innerRC = 1
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/burner/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
)

type timings struct {
Timestamp time.Time `json:"timestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
ElapsedTime float64 `json:"elapsedTime"`
Timestamp time.Time `json:"timestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
ChurnStartTimestamp *time.Time `json:"churnStartTimestamp,omitempty"`
ChurnEndTimestamp *time.Time `json:"churnEndTimestamp,omitempty"`
ElapsedTime float64 `json:"elapsedTime"`
}

type jobSummary struct {
Expand Down
31 changes: 17 additions & 14 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ func (p *Prometheus) ScrapeJobsMetrics(jobList ...Job) error {
query := renderedQuery.String()
renderedQuery.Reset()
if md.Instant {
docsToIndex[md.MetricName+"-start"] = append(docsToIndex[md.MetricName+"-start"], p.runInstantQuery(query, md.MetricName+"-start", jobStart, eachJob.JobConfig)...)
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runInstantQuery(query, md.MetricName, jobEnd, eachJob.JobConfig)...)
docsToIndex[md.MetricName+"-start"] = append(docsToIndex[md.MetricName+"-start"], p.runInstantQuery(query, md.MetricName+"-start", jobStart, eachJob)...)
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runInstantQuery(query, md.MetricName, jobEnd, eachJob)...)
} else {
requiresInstant = ((jobEnd.Sub(jobStart).Milliseconds())%(p.Step.Milliseconds()) != 0)
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runRangeQuery(query, md.MetricName, jobStart, jobEnd, eachJob.JobConfig)...)
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runRangeQuery(query, md.MetricName, jobStart, jobEnd, eachJob)...)
}
if requiresInstant {
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runInstantQuery(query, md.MetricName, jobEnd, eachJob.JobConfig)...)
docsToIndex[md.MetricName] = append(docsToIndex[md.MetricName], p.runInstantQuery(query, md.MetricName, jobEnd, eachJob)...)
}
}
}
Expand All @@ -100,27 +100,27 @@ func (p *Prometheus) ScrapeJobsMetrics(jobList ...Job) error {
}

// Parse vector parses results for an instant query
func (p *Prometheus) parseVector(metricName, query string, jobConfig config.Job, value model.Value, metrics *[]interface{}) error {
func (p *Prometheus) parseVector(metricName, query string, job Job, value model.Value, metrics *[]interface{}) error {
data, ok := value.(model.Vector)
if !ok {
return fmt.Errorf("unsupported result format: %s", value.Type().String())
}
for _, vector := range data {
m := p.createMetric(query, metricName, jobConfig, vector.Metric, vector.Value, vector.Timestamp.Time().UTC())
m := p.createMetric(query, metricName, job, vector.Metric, vector.Value, vector.Timestamp.Time().UTC(), true)
*metrics = append(*metrics, m)
}
return nil
}

// Parse matrix parses results for an non-instant query
func (p *Prometheus) parseMatrix(metricName, query string, jobConfig config.Job, value model.Value, metrics *[]interface{}) error {
func (p *Prometheus) parseMatrix(metricName, query string, job Job, value model.Value, metrics *[]interface{}) error {
data, ok := value.(model.Matrix)
if !ok {
return fmt.Errorf("unsupported result format: %s", value.Type().String())
}
for _, matrix := range data {
for _, val := range matrix.Values {
m := p.createMetric(query, metricName, jobConfig, matrix.Metric, val.Value, val.Timestamp.Time().UTC())
m := p.createMetric(query, metricName, job, matrix.Metric, val.Value, val.Timestamp.Time().UTC(), false)
*metrics = append(*metrics, m)
}
}
Expand Down Expand Up @@ -158,13 +158,13 @@ func (p *Prometheus) ReadProfile(metricsProfile string) error {
}

// Create metric creates metric to be indexed
func (p *Prometheus) createMetric(query, metricName string, jobConfig config.Job, labels model.Metric, value model.SampleValue, timestamp time.Time) metric {
func (p *Prometheus) createMetric(query, metricName string, job Job, labels model.Metric, value model.SampleValue, timestamp time.Time, isInstant bool) metric {
m := metric{
Labels: make(map[string]string),
UUID: p.UUID,
Query: query,
MetricName: metricName,
JobConfig: jobConfig,
JobConfig: job.JobConfig,
Timestamp: timestamp,
Metadata: p.metadata,
}
Expand All @@ -178,11 +178,14 @@ func (p *Prometheus) createMetric(query, metricName string, jobConfig config.Job
} else {
m.Value = float64(value)
}
if !isInstant && timestamp.After(job.ChurnStart) && timestamp.Before(job.ChurnEnd) {
m.ChurnMetric = true
}
return m
}

// runInstantQuery function to run an instant query
func (p *Prometheus) runInstantQuery(query, metricName string, timestamp time.Time, jobConfig config.Job) []interface{} {
func (p *Prometheus) runInstantQuery(query, metricName string, timestamp time.Time, job Job) []interface{} {
var v model.Value
var err error
var datapoints []interface{}
Expand All @@ -191,14 +194,14 @@ func (p *Prometheus) runInstantQuery(query, metricName string, timestamp time.Ti
log.Warnf("Error found with query %s: %s", query, err)
return []interface{}{}
}
if err = p.parseVector(metricName, query, jobConfig, v, &datapoints); err != nil {
if err = p.parseVector(metricName, query, job, v, &datapoints); err != nil {
log.Warnf("Error found parsing result from query %s: %s", query, err)
}
return datapoints
}

// runRangeQuery function to run a range query
func (p *Prometheus) runRangeQuery(query, metricName string, jobStart, jobEnd time.Time, jobConfig config.Job) []interface{} {
func (p *Prometheus) runRangeQuery(query, metricName string, jobStart, jobEnd time.Time, job Job) []interface{} {
var v model.Value
var err error
var datapoints []interface{}
Expand All @@ -208,7 +211,7 @@ func (p *Prometheus) runRangeQuery(query, metricName string, jobStart, jobEnd ti
log.Warnf("Error found with query %s: %s", query, err)
return []interface{}{}
}
if err = p.parseMatrix(metricName, query, jobConfig, v, &datapoints); err != nil {
if err = p.parseMatrix(metricName, query, job, v, &datapoints); err != nil {
log.Warnf("Error found parsing result from query %s: %s", query, err)
}
return datapoints
Expand Down
25 changes: 14 additions & 11 deletions pkg/prometheus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type Prometheus struct {
}

type Job struct {
Start time.Time
End time.Time
JobConfig config.Job
Start time.Time
End time.Time
ChurnStart time.Time
ChurnEnd time.Time
JobConfig config.Job
}

// metricDefinition describes what metrics kube-burner collects
Expand All @@ -57,12 +59,13 @@ type metricDefinition struct {
}

type metric struct {
Timestamp time.Time `json:"timestamp"`
Labels map[string]string `json:"labels,omitempty"`
Value float64 `json:"value"`
UUID string `json:"uuid"`
Query string `json:"query"`
MetricName string `json:"metricName,omitempty"`
JobConfig config.Job `json:"jobConfig,omitempty"`
Metadata interface{} `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
Labels map[string]string `json:"labels,omitempty"`
Value float64 `json:"value"`
UUID string `json:"uuid"`
Query string `json:"query"`
ChurnMetric bool `json:"churnMetric,omitempty"`
MetricName string `json:"metricName,omitempty"`
JobConfig config.Job `json:"jobConfig,omitempty"`
Metadata interface{} `json:"metadata,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/util/metrics/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func CreateTarball(indexerConfig indexers.IndexerConfig) error {
return nil
}

func ImportTarball(tarball string, indexer *indexers.Indexer, metricsDir string) error {
func ImportTarball(tarball string, indexer *indexers.Indexer) error {
log.Infof("Importing tarball: %v", tarball)
var rawData bytes.Buffer
tarballFile, err := os.Open(tarball)
Expand Down
7 changes: 0 additions & 7 deletions pkg/util/metrics/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ import (
"gopkg.in/yaml.v3"
)

// Performs the validity check of metrics endpoint and prometheus url
func validateMetricsEndpoint(metricsEndpoint string, prometheusURL string) {
if (metricsEndpoint != "" && prometheusURL != "") || (metricsEndpoint == "" && prometheusURL == "") {
log.Fatal("Please use either of --metrics-endpoint or --prometheus-url flags to fetch metrics or alerts")
}
}

// Decodes metrics endpoint yaml file
func DecodeMetricsEndpoint(metricsEndpoint string, metricsEndpoints *[]metricEndpoint) {
f, err := util.ReadConfig(metricsEndpoint)
Expand Down

0 comments on commit f51585d

Please sign in to comment.