Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Analysis argument validation #1412

Merged
merged 16 commits into from
Aug 25, 2021
81 changes: 58 additions & 23 deletions analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"k8s.io/utils/pointer"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +45,31 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph

if run.Status.MetricResults == nil {
run.Status.MetricResults = make([]v1alpha1.MetricResult, 0)
err := analysisutil.ValidateMetrics(run.Spec.Metrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}
}

tasks := generateMetricTasks(run)
resolvedMetrics, err := getResolvedMetricsWithoutSecrets(run.Spec.Metrics, run.Spec.Args)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

err = analysisutil.ValidateMetrics(resolvedMetrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

tasks := generateMetricTasks(run, resolvedMetrics)
log.Infof("taking %d measurements", len(tasks))
err := c.runMeasurements(run, tasks)
err = c.runMeasurements(run, tasks)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
Expand All @@ -66,7 +79,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

newStatus, newMessage := c.assessRunStatus(run)
newStatus, newMessage := c.assessRunStatus(run, resolvedMetrics)
if newStatus != run.Status.Phase {
run.Status.Phase = newStatus
run.Status.Message = newMessage
Expand All @@ -81,7 +94,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warnf("Failed to garbage collect measurements: %v", err)
}

nextReconcileTime := calculateNextReconcileTime(run)
nextReconcileTime := calculateNextReconcileTime(run, resolvedMetrics)
if nextReconcileTime != nil {
enqueueSeconds := nextReconcileTime.Sub(time.Now())
if enqueueSeconds < 0 {
Expand All @@ -93,6 +106,27 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

func getResolvedMetricsWithoutSecrets(metrics []v1alpha1.Metric, args []v1alpha1.Argument) ([]v1alpha1.Metric, error) {
newArgs := make([]v1alpha1.Argument, 0)
for _, arg := range args {
newArg := arg.DeepCopy()
if newArg.ValueFrom != nil && newArg.ValueFrom.SecretKeyRef != nil {
newArg.ValueFrom = nil
newArg.Value = pointer.StringPtr("temp-for-secret")
}
newArgs = append(newArgs, *newArg)
}
resolvedMetrics := make([]v1alpha1.Metric, 0)
for _, metric := range metrics {
resolvedMetric, err := analysisutil.ResolveMetricArgs(metric, newArgs)
if err != nil {
return nil, err
}
resolvedMetrics = append(resolvedMetrics, *resolvedMetric)
}
return resolvedMetrics, nil
}

func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun) {
eventType := corev1.EventTypeNormal
switch run.Status.Phase {
Expand All @@ -106,11 +140,12 @@ func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun)
// sync, based on the last completion times that metric was measured (if ever). If the run is
// terminating (e.g. due to manual termination or failing metric), will not schedule further
// measurements other than to resume any in-flight measurements.
func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
func generateMetricTasks(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) []metricTask {
log := logutil.WithAnalysisRun(run)
var tasks []metricTask
terminating := analysisutil.IsTerminating(run)
for _, metric := range run.Spec.Metrics {

for i, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
continue
}
Expand All @@ -124,7 +159,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
// last measurement is still in-progress. need to complete it
logCtx.Infof("resuming in-progress measurement")
tasks = append(tasks, metricTask{
metric: metric,
metric: run.Spec.Metrics[i],
incompleteMeasurement: lastMeasurement,
})
continue
Expand All @@ -149,7 +184,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
}
}
// measurement never taken
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running initial measurement")
continue
}
Expand All @@ -174,7 +209,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
interval = metricInterval
}
if time.Now().After(lastMeasurement.FinishedAt.Add(interval)) {
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running overdue measurement")
continue
}
Expand Down Expand Up @@ -238,7 +273,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
var resultsLock sync.Mutex
terminating := analysisutil.IsTerminating(run)

// resolve args for metricTasks
// resolve args for metric tasks
// get list of secret values for log redaction
tasks, secrets, err := c.resolveArgs(tasks, run.Spec.Args, run.Namespace)
if err != nil {
Expand Down Expand Up @@ -345,7 +380,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
// assessRunStatus assesses the overall status of this AnalysisRun
// If any metric is not yet completed, the AnalysisRun is still considered Running
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.AnalysisPhase, string) {
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) (v1alpha1.AnalysisPhase, string) {
var worstStatus v1alpha1.AnalysisPhase
var worstMessage string
terminating := analysisutil.IsTerminating(run)
Expand All @@ -360,7 +395,7 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
}

// Iterate all metrics and update MetricResult.Phase fields based on latest measurement(s)
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if result := analysisutil.GetResult(run, metric.Name); result != nil {
log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name)
metricStatus := assessMetricStatus(metric, *result, terminating)
Expand Down Expand Up @@ -497,9 +532,9 @@ func assessMetricFailureInconclusiveOrError(metric v1alpha1.Metric, result v1alp

// calculateNextReconcileTime calculates the next time that this AnalysisRun should be reconciled,
// based on the earliest time of all metrics intervals, counts, and their finishedAt timestamps
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time {
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) *time.Time {
var reconcileTime *time.Time
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
// NOTE: this also covers the case where metric.Count is reached
continue
Expand Down
Loading