Skip to content

Commit

Permalink
fix: Analysis argument validation (#1412)
Browse files Browse the repository at this point in the history
* fix: Analysis argument validation

Signed-off-by: khhirani <[email protected]>
  • Loading branch information
khhirani authored and alexmt committed Aug 26, 2021
1 parent 4022cb8 commit 58994c9
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 60 deletions.
81 changes: 58 additions & 23 deletions analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"k8s.io/utils/pointer"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +45,31 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph

if run.Status.MetricResults == nil {
run.Status.MetricResults = make([]v1alpha1.MetricResult, 0)
err := analysisutil.ValidateMetrics(run.Spec.Metrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}
}

tasks := generateMetricTasks(run)
resolvedMetrics, err := getResolvedMetricsWithoutSecrets(run.Spec.Metrics, run.Spec.Args)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

err = analysisutil.ValidateMetrics(resolvedMetrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

tasks := generateMetricTasks(run, resolvedMetrics)
log.Infof("taking %d measurements", len(tasks))
err := c.runMeasurements(run, tasks)
err = c.runMeasurements(run, tasks)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
Expand All @@ -66,7 +79,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

newStatus, newMessage := c.assessRunStatus(run)
newStatus, newMessage := c.assessRunStatus(run, resolvedMetrics)
if newStatus != run.Status.Phase {
run.Status.Phase = newStatus
run.Status.Message = newMessage
Expand All @@ -81,7 +94,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warnf("Failed to garbage collect measurements: %v", err)
}

nextReconcileTime := calculateNextReconcileTime(run)
nextReconcileTime := calculateNextReconcileTime(run, resolvedMetrics)
if nextReconcileTime != nil {
enqueueSeconds := nextReconcileTime.Sub(time.Now())
if enqueueSeconds < 0 {
Expand All @@ -93,6 +106,27 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

func getResolvedMetricsWithoutSecrets(metrics []v1alpha1.Metric, args []v1alpha1.Argument) ([]v1alpha1.Metric, error) {
newArgs := make([]v1alpha1.Argument, 0)
for _, arg := range args {
newArg := arg.DeepCopy()
if newArg.ValueFrom != nil && newArg.ValueFrom.SecretKeyRef != nil {
newArg.ValueFrom = nil
newArg.Value = pointer.StringPtr("temp-for-secret")
}
newArgs = append(newArgs, *newArg)
}
resolvedMetrics := make([]v1alpha1.Metric, 0)
for _, metric := range metrics {
resolvedMetric, err := analysisutil.ResolveMetricArgs(metric, newArgs)
if err != nil {
return nil, err
}
resolvedMetrics = append(resolvedMetrics, *resolvedMetric)
}
return resolvedMetrics, nil
}

func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun) {
eventType := corev1.EventTypeNormal
switch run.Status.Phase {
Expand All @@ -106,11 +140,12 @@ func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun)
// sync, based on the last completion times that metric was measured (if ever). If the run is
// terminating (e.g. due to manual termination or failing metric), will not schedule further
// measurements other than to resume any in-flight measurements.
func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
func generateMetricTasks(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) []metricTask {
log := logutil.WithAnalysisRun(run)
var tasks []metricTask
terminating := analysisutil.IsTerminating(run)
for _, metric := range run.Spec.Metrics {

for i, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
continue
}
Expand All @@ -124,7 +159,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
// last measurement is still in-progress. need to complete it
logCtx.Infof("resuming in-progress measurement")
tasks = append(tasks, metricTask{
metric: metric,
metric: run.Spec.Metrics[i],
incompleteMeasurement: lastMeasurement,
})
continue
Expand All @@ -149,7 +184,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
}
}
// measurement never taken
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running initial measurement")
continue
}
Expand All @@ -174,7 +209,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
interval = metricInterval
}
if time.Now().After(lastMeasurement.FinishedAt.Add(interval)) {
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running overdue measurement")
continue
}
Expand Down Expand Up @@ -238,7 +273,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
var resultsLock sync.Mutex
terminating := analysisutil.IsTerminating(run)

// resolve args for metricTasks
// resolve args for metric tasks
// get list of secret values for log redaction
tasks, secrets, err := c.resolveArgs(tasks, run.Spec.Args, run.Namespace)
if err != nil {
Expand Down Expand Up @@ -345,7 +380,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
// assessRunStatus assesses the overall status of this AnalysisRun
// If any metric is not yet completed, the AnalysisRun is still considered Running
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.AnalysisPhase, string) {
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) (v1alpha1.AnalysisPhase, string) {
var worstStatus v1alpha1.AnalysisPhase
var worstMessage string
terminating := analysisutil.IsTerminating(run)
Expand All @@ -360,7 +395,7 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
}

// Iterate all metrics and update MetricResult.Phase fields based on latest measurement(s)
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if result := analysisutil.GetResult(run, metric.Name); result != nil {
log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name)
metricStatus := assessMetricStatus(metric, *result, terminating)
Expand Down Expand Up @@ -497,9 +532,9 @@ func assessMetricFailureInconclusiveOrError(metric v1alpha1.Metric, result v1alp

// calculateNextReconcileTime calculates the next time that this AnalysisRun should be reconciled,
// based on the earliest time of all metrics intervals, counts, and their finishedAt timestamps
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time {
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) *time.Time {
var reconcileTime *time.Time
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
// NOTE: this also covers the case where metric.Count is reached
continue
Expand Down
Loading

0 comments on commit 58994c9

Please sign in to comment.