Skip to content

Commit

Permalink
fix(metrics-operator): fix panic due to write attempt on closed chann…
Browse files Browse the repository at this point in the history
…el (#2119)

Signed-off-by: Florian Bacher <[email protected]>
  • Loading branch information
bacherfl authored Sep 19, 2023
1 parent 010d7cd commit 33eb9d7
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestEvaluate(t *testing.T) {
},
},
providerRequest: metricstypes.ProviderRequest{
Objective: &metricsapi.Objective{
Objective: metricsapi.Objective{
AnalysisValueTemplateRef: metricsapi.ObjectReference{
Name: "mytemp",
Namespace: "default",
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestEvaluate(t *testing.T) {
},
},
providerRequest: metricstypes.ProviderRequest{
Objective: &metricsapi.Objective{
Objective: metricsapi.Objective{
AnalysisValueTemplateRef: metricsapi.ObjectReference{
Name: "mytemp",
Namespace: "default",
Expand Down
12 changes: 4 additions & 8 deletions metrics-operator/controllers/analysis/provider_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (ps ProvidersPool) StartProviders(ctx context.Context, numJobs int) {
ps.providers[provider] = channel
go ps.Evaluate(ctx, provider, channel)
}

}

func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
Expand All @@ -64,8 +63,7 @@ func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
if err != nil {
ps.log.Error(err, "Failed to get AnalysisValueTemplate")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}

providerRef := &metricsapi.KeptnMetricsProvider{}
Expand All @@ -79,20 +77,18 @@ func (ps ProvidersPool) DispatchToProviders(ctx context.Context, id int) {
if err != nil {
ps.log.Error(err, "Failed to get KeptnMetricsProvider")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}

templatedQuery, err := generateQuery(templ.Spec.Query, ps.Analysis.Spec.Args)
if err != nil {
ps.log.Error(err, "Failed to substitute args in AnalysisValueTemplate")
ps.results <- metricsapi.ProviderResult{Objective: j.AnalysisValueTemplateRef, ErrMsg: err.Error()}
ps.cancel()
return
continue
}
//send job to provider solver
ps.providers[providerRef.Spec.Type] <- metricstypes.ProviderRequest{
Objective: &j,
Objective: j,
Query: templatedQuery,
Provider: providerRef,
}
Expand Down
20 changes: 9 additions & 11 deletions metrics-operator/controllers/analysis/worker_pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package analysis

import (
"time"

"github.com/go-logr/logr"
metricsapi "github.com/keptn/lifecycle-toolkit/metrics-operator/api/v1alpha3"
"github.com/keptn/lifecycle-toolkit/metrics-operator/controllers/common/analysis"
Expand All @@ -23,7 +25,7 @@ func NewWorkersPool(ctx context.Context, analysis *metricsapi.Analysis, objectiv
if numJobs <= numWorkers { // do not start useless go routines
numWorkers = numJobs
}
_, cancel := context.WithCancel(ctx)
_, cancel := context.WithTimeout(ctx, 10*time.Second)
providerChans := make(map[string]chan metricstypes.ProviderRequest, len(providers.SupportedProviders))

assigner := TaskAssigner{tasks: objectives, numWorkers: numWorkers}
Expand Down Expand Up @@ -73,24 +75,20 @@ func (aw WorkersPool) DispatchAndCollect(ctx context.Context) (map[string]metric
func (aw WorkersPool) CollectAnalysisResults(ctx context.Context) (map[string]metricsapi.ProviderResult, error) {
var err error
results := make(map[string]metricsapi.ProviderResult, aw.numJobs)
loop:
for a := 1; a <= aw.numJobs; a++ {
select {
case <-ctx.Done():
err = errors.New("Collection terminated")
break loop
break
default:
res, err2 := aw.GetResult(ctx)
if err2 != nil {
err = err2
aw.cancel()
break loop
}
results[analysis.ComputeKey(res.Objective)] = *res
if res.ErrMsg != "" {
err = errors.New(res.ErrMsg)
aw.cancel()
break loop
} else {
results[analysis.ComputeKey(res.Objective)] = *res
if res.ErrMsg != "" {
err = errors.New(res.ErrMsg)
}
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions metrics-operator/controllers/analysis/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,44 @@ func TestWorkersPool_CollectAnalysisResults(t *testing.T) {
require.Equal(t, res2, results["t2"])
}

func TestWorkersPool_CollectAnalysisResultsWithError(t *testing.T) {
// Create a fake WorkersPool instance for testing
resChan := make(chan metricsapi.ProviderResult, 2)
fakePool := WorkersPool{
IProvidersPool: ProvidersPool{
results: resChan,
},
numJobs: 2,
}

res1 := metricsapi.ProviderResult{
Objective: metricsapi.ObjectReference{Name: "t1"},
Value: "result1",
ErrMsg: "",
}

res2 := metricsapi.ProviderResult{
Objective: metricsapi.ObjectReference{Name: "t2"},
Value: "result2",
ErrMsg: "unexpected error",
}

// Create and send mock results to the results channel
go func() {
time.Sleep(time.Second)
resChan <- res1
resChan <- res2
}()

// Collect the results
results, err := fakePool.CollectAnalysisResults(context.TODO())

// Check the collected results
require.NotNil(t, err)
require.Equal(t, res1, results["t1"])
require.Equal(t, res2, results["t2"])
}

func TestWorkersPool_CollectAnalysisResultsTimeout(t *testing.T) {
// Create a fake WorkersPool instance for testing
resChan := make(chan metricsapi.ProviderResult, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type ProviderRequest struct {
Objective *v1alpha3.Objective
Objective v1alpha3.Objective
Query string
Provider *v1alpha3.KeptnMetricsProvider
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
envsubst < mock-server.yaml | kubectl apply -f -
# substitutes current time and namespace, making sure they are changed to env var first
# to prevent bad files in case of a test interrupt
- script: |
envsubst < install.yaml | kubectl apply -f - -n $NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: metrics.keptn.sh/v1alpha3
kind: Analysis
metadata:
name: analysis-sample
spec:
analysisDefinition:
name: ed-my-proj-dev-svc1
status:
pass: true
# yamllint disable-line rule:line-length
raw: '{"objectiveResults":[{"result":{"failResult":{"operator":{"lessThan":{"fixedValue":"5"}},"fulfilled":false},"warnResult":{"operator":{"lessThan":{"fixedValue":"4"}},"fulfilled":false},"warning":false,"pass":true},"value":11,"score":2},{"result":{"failResult":{"operator":{"greaterThan":{"fixedValue":"20"}},"fulfilled":false},"warnResult":{"operator":{"greaterThan":{"fixedValue":"15"}},"fulfilled":true},"warning":true,"pass":false},"value":20,"score":0.5},{"result":{"failResult":{"operator":{"notInRange":{"lowBound":"25","highBound":"35"}},"fulfilled":false},"warnResult":{"operator":{},"fulfilled":false},"warning":false,"pass":true},"value":30,"score":1}],"totalScore":3.5,"maximumScore":4,"pass":true,"warning":false}'
104 changes: 104 additions & 0 deletions test/integration/analysis-controller-multiple-providers/install.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
apiVersion: metrics.keptn.sh/v1alpha3
kind: AnalysisValueTemplate
metadata:
name: value-1
spec:
provider:
name: my-first-mocked-provider
query: 'query-1'
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: AnalysisValueTemplate
metadata:
name: value-2
spec:
provider:
name: my-second-mocked-provider
query: 'query-2'
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: AnalysisValueTemplate
metadata:
name: value-3
spec:
provider:
name: my-third-mocked-provider
query: 'query-3'
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: AnalysisDefinition
metadata:
name: ed-my-proj-dev-svc1
spec:
objectives:
- analysisValueTemplateRef:
name: value-1
target:
failure:
lessThan:
fixedValue: 5
warning:
lessThan:
fixedValue: 4
weight: 2
keyObjective: false
- analysisValueTemplateRef:
name: value-2
target:
failure:
greaterThan:
fixedValue: 20
warning:
greaterThan:
fixedValue: 15
weight: 1
keyObjective: false
- analysisValueTemplateRef:
name: value-3
target:
failure:
notInRange:
lowBound: 25
highBound: 35
weight: 1
keyObjective: false
totalScore:
passPercentage: 75
warningPercentage: 50
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: Analysis
metadata:
name: analysis-sample
spec:
timeframe:
from: 2023-09-14T07:33:19Z
to: 2023-09-14T07:33:19Z
args:
"ns": "keptn-lifecycle-toolkit-system"
analysisDefinition:
name: ed-my-proj-dev-svc1
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: KeptnMetricsProvider
metadata:
name: my-first-mocked-provider
spec:
type: prometheus
targetServer: "http://mockserver.$NAMESPACE.svc.cluster.local:1080"
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: KeptnMetricsProvider
metadata:
name: my-second-mocked-provider
spec:
type: prometheus
targetServer: "http://mockserver.$NAMESPACE.svc.cluster.local:1080"
---
apiVersion: metrics.keptn.sh/v1alpha3
kind: KeptnMetricsProvider
metadata:
name: my-third-mocked-provider
spec:
type: prometheus
targetServer: "http://mockserver.$NAMESPACE.svc.cluster.local:1080"
Loading

0 comments on commit 33eb9d7

Please sign in to comment.