Skip to content

Commit

Permalink
fix: avoid writing to already closed channel; wait for all jobs to fi…
Browse files Browse the repository at this point in the history
…nish or time out in case of provider error

Signed-off-by: Florian Bacher <[email protected]>
  • Loading branch information
bacherfl committed Sep 18, 2023
1 parent b7feb96 commit 9b0afbb
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestEvaluate(t *testing.T) {
},
},
providerRequest: metricstypes.ProviderRequest{
Objective: &metricsapi.Objective{
Objective: metricsapi.Objective{
AnalysisValueTemplateRef: metricsapi.ObjectReference{
Name: "mytemp",
Namespace: "default",
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestEvaluate(t *testing.T) {
},
},
providerRequest: metricstypes.ProviderRequest{
Objective: &metricsapi.Objective{
Objective: metricsapi.Objective{
AnalysisValueTemplateRef: metricsapi.ObjectReference{
Name: "mytemp",
Namespace: "default",
Expand Down
12 changes: 4 additions & 8 deletions metrics-operator/controllers/analysis/provider_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (ps ProvidersPool) StartProviders(ctx context.Context, numJobs int) {
ps.providers[provider] = channel
go ps.Evaluate(ctx, provider, channel)
}

}

func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
Expand All @@ -67,8 +66,7 @@ func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
if err != nil {
ps.log.Error(err, "Failed to get the correct Provider")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}

providerRef := &metricsapi.KeptnMetricsProvider{}
Expand All @@ -85,20 +83,18 @@ func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
if err != nil {
ps.log.Error(err, "Failed to get Provider")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}

templatedQuery, err := generateQuery(templ.Spec.Query, ps.Analysis.Spec.Args)
if err != nil {
ps.log.Error(err, "Failed to substitute args in templ")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}
//send job to provider solver
ps.providers[providerRef.Spec.Type] <- metricstypes.ProviderRequest{
Objective: &j,
Objective: j,
Query: templatedQuery,
Provider: providerRef,
}
Expand Down
11 changes: 4 additions & 7 deletions metrics-operator/controllers/analysis/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

//go:generate moq -pkg fake -skip-ensure -out ./fake/analysispool_mock.go . IAnalysisPool
Expand All @@ -23,7 +24,7 @@ func NewWorkersPool(ctx context.Context, analysis *metricsapi.Analysis, objectiv
if numJobs <= numWorkers { // do not start useless go routines
numWorkers = numJobs
}
_, cancel := context.WithCancel(ctx)
_, cancel := context.WithTimeout(ctx, 10*time.Second)
providerChans := make(map[string]chan metricstypes.ProviderRequest, len(providers.SupportedProviders))

assigner := TaskAssigner{tasks: objectives, numWorkers: numWorkers}
Expand Down Expand Up @@ -73,24 +74,20 @@ func (aw WorkersPool) DispatchAndCollect(ctx context.Context) (map[string]metric
func (aw WorkersPool) CollectAnalysisResults(ctx context.Context) (map[string]metricsapi.ProviderResult, error) {
var err error
results := make(map[string]metricsapi.ProviderResult, aw.numJobs)
loop:
for a := 1; a <= aw.numJobs; a++ {
select {
case <-ctx.Done():
err = errors.New("Collection terminated")
break loop
break
default:
res, err2 := aw.GetResult(ctx)
if err2 != nil {
err = err2
aw.cancel()
break loop
}
// TODO for some reason not all values are there
results[analysis.ComputeKey(res.Objective)] = *res
if res.ErrMsg != "" {
err = errors.New(res.ErrMsg)
aw.cancel()
break loop
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type ProviderRequest struct {
Objective *v1alpha3.Objective
Objective v1alpha3.Objective
Query string
Provider *v1alpha3.KeptnMetricsProvider
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ spec:
status:
pass: true
# yamllint disable-line rule:line-length
raw: '{"objectiveResults":[{"result":{"failResult":{"operator":{"lessThan":{"fixedValue":"2"}},"fulfilled":false},"warnResult":{"operator":{"lessThan":{"fixedValue":"3"}},"fulfilled":false},"warning":false,"pass":true},"value":4,"score":1}],"totalScore":1,"maximumScore":1,"pass":true,"warning":false}'
raw: '{"objectiveResults":[{"result":{"failResult":{"operator":{"lessThan":{"fixedValue":"5"}},"fulfilled":false},"warnResult":{"operator":{"lessThan":{"fixedValue":"4"}},"fulfilled":false},"warning":false,"pass":true},"value":11,"score":2},{"result":{"failResult":{"operator":{"greaterThan":{"fixedValue":"20"}},"fulfilled":false},"warnResult":{"operator":{"greaterThan":{"fixedValue":"15"}},"fulfilled":true},"warning":true,"pass":false},"value":20,"score":0.5},{"result":{"failResult":{"operator":{"notInRange":{"lowBound":"25","highBound":"35"}},"fulfilled":false},"warnResult":{"operator":{},"fulfilled":false},"warning":false,"pass":true},"value":30,"score":1}],"totalScore":3.5,"maximumScore":4,"pass":true,"warning":false}'

0 comments on commit 9b0afbb

Please sign in to comment.