Skip to content

Commit

Permalink
Try fix flaky tests, + use filepath.Join (netobserv#145)
Browse files Browse the repository at this point in the history
Flaky tests are due to the new status conditions recently added to the CRD: the CR kinda auto-updates its status and could cause a race when tests also update the CR in the meantime, resulting in this error message: "the object has been modified; please apply your changes to the latest version and try again"

To fix that, this patch provide a new idiomatic way to update CR in
tests, where it will first fetch the current CR until it is ready
(assuming than its status won't change "by itself" once it's ready)
  • Loading branch information
jotak authored Aug 3, 2022
1 parent f1b1989 commit 69c5e3e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 89 deletions.
29 changes: 9 additions & 20 deletions controllers/flowcollector_controller_console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,16 @@ func flowCollectorConsolePluginSpecs() {
timeout, interval).Should(Equal("http://loki:3100/"))
})
It("Should update the Loki URL in the Console Plugin if it changes in the Spec", func() {
Expect(func() error {
upd := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &upd); err != nil {
return err
}
upd.Spec.Loki.URL = "http://loki.namespace:8888"
return k8sClient.Update(ctx, &upd)
}()).Should(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Loki.URL = "http://loki.namespace:8888"
})
Eventually(getContainerArgumentAfter("network-observability-plugin", "-loki", cpKey),
timeout, interval).Should(Equal("http://loki.namespace:8888"))
})
It("Should use the Loki Querier URL instead of the Loki URL, if the first is defined", func() {
Expect(func() error {
upd := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &upd); err != nil {
return err
}
upd.Spec.Loki.QuerierURL = "http://loki-querier:6789"
return k8sClient.Update(ctx, &upd)
}()).Should(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Loki.QuerierURL = "http://loki-querier:6789"
})
Eventually(getContainerArgumentAfter("network-observability-plugin", "-loki", cpKey),
timeout, interval).Should(Equal("http://loki-querier:6789"))
})
Expand All @@ -217,10 +207,9 @@ func flowCollectorConsolePluginSpecs() {

It("Should be unregistered", func() {
By("Update CR to unregister")
fc := flowsv1alpha1.FlowCollector{}
Expect(k8sClient.Get(ctx, crKey, &fc)).Should(Succeed())
fc.Spec.ConsolePlugin.Register = false
Expect(k8sClient.Update(ctx, &fc)).Should(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.ConsolePlugin.Register = false
})

By("Expecting the Console CR to not have plugin registered")
Eventually(func() interface{} {
Expand Down
11 changes: 5 additions & 6 deletions controllers/flowcollector_controller_ebpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,11 @@ func flowCollectorEBPFSpecs() {
})

It("Should update fields that have changed", func() {
updated := flowsv1alpha1.FlowCollector{}
Expect(k8sClient.Get(ctx, crKey, &updated)).Should(Succeed())
Expect(updated.Spec.EBPF.Sampling).To(Equal(int32(123)))
updated.Spec.EBPF.Sampling = 4
updated.Spec.EBPF.Privileged = true
Expect(k8sClient.Update(ctx, &updated)).Should(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
Expect(fc.Spec.EBPF.Sampling).To(Equal(int32(123)))
fc.Spec.EBPF.Sampling = 4
fc.Spec.EBPF.Privileged = true
})

ds := appsv1.DaemonSet{}
By("expecting that the daemonset spec has eventually changed")
Expand Down
117 changes: 55 additions & 62 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ascv2 "k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/netobserv/network-observability-operator/controllers/constants"
. "github.com/netobserv/network-observability-operator/controllers/controllerstest"
"github.com/netobserv/network-observability-operator/controllers/flowlogspipeline"
"github.com/netobserv/network-observability-operator/pkg/conditions"
"github.com/netobserv/network-observability-operator/pkg/helper"
)

Expand Down Expand Up @@ -200,16 +202,11 @@ func flowCollectorControllerSpecs() {
})

It("Should update successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.IPFIX.CacheActiveTimeout = "30s"
fc.Spec.IPFIX.Sampling = 1234
fc.Spec.FlowlogsPipeline.Port = 1999
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

By("Expecting updated flowlogs-pipeline Service port")
Eventually(func() interface{} {
Expand Down Expand Up @@ -268,14 +265,9 @@ func flowCollectorControllerSpecs() {
})

It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Loki.MaxRetries = 7
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed")
Eventually(func() error {
Expand All @@ -299,11 +291,10 @@ func flowCollectorControllerSpecs() {
Expect(hpa.Spec.MaxReplicas).To(Equal(int32(1)))
Expect(*hpa.Spec.Metrics[0].Resource.Target.AverageUtilization).To(Equal(int32(90)))
// update FlowCollector and verify that HPA spec also changed
fc := flowsv1alpha1.FlowCollector{}
Expect(k8sClient.Get(ctx, crKey, &fc)).To(Succeed())
fc.Spec.FlowlogsPipeline.HPA.MinReplicas = pointer.Int32(2)
fc.Spec.FlowlogsPipeline.HPA.MaxReplicas = 2
Expect(k8sClient.Update(ctx, &fc)).To(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.FlowlogsPipeline.HPA.MinReplicas = pointer.Int32(2)
fc.Spec.FlowlogsPipeline.HPA.MaxReplicas = 2
})

By("Changing the Horizontal Pod Autoscaler instance")
Eventually(func() error {
Expand All @@ -324,21 +315,19 @@ func flowCollectorControllerSpecs() {
Context("Deploying as DaemonSet", func() {
var oldConfigDigest string
It("Should update successfully", func() {
fc := flowsv1alpha1.FlowCollector{}
Expect(k8sClient.Get(ctx, crKey, &fc)).Should(Succeed())
fc.Spec.FlowlogsPipeline = flowsv1alpha1.FlowCollectorFLP{
Kind: "DaemonSet",
Port: 7891,
ImagePullPolicy: "Never",
LogLevel: "error",
Image: "testimg:latest",
}
fc.Spec.Loki = flowsv1alpha1.FlowCollectorLoki{}
fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{
Sampling: 200,
}
// Update
Expect(k8sClient.Update(ctx, &fc)).Should(Succeed())
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.FlowlogsPipeline = flowsv1alpha1.FlowCollectorFLP{
Kind: "DaemonSet",
Port: 7891,
ImagePullPolicy: "Never",
LogLevel: "error",
Image: "testimg:latest",
}
fc.Spec.Loki = flowsv1alpha1.FlowCollectorLoki{}
fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{
Sampling: 200,
}
})

By("Expecting to create the ovn-flows-configmap with the configuration from the FlowCollector")
Eventually(func() interface{} {
Expand Down Expand Up @@ -392,14 +381,9 @@ func flowCollectorControllerSpecs() {
})
})
It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Loki.MaxRetries = 7
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})

By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed")
Eventually(func() error {
Expand All @@ -419,11 +403,7 @@ func flowCollectorControllerSpecs() {

Context("Changing kafka config", func() {
It("Should update kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Kafka = flowsv1alpha1.FlowCollectorKafka{
Enable: true,
Address: "localhost:9092",
Expand All @@ -436,8 +416,7 @@ func flowCollectorControllerSpecs() {
},
},
}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})
})

It("Should deploy kafka ingester and transformer", func() {
Expand Down Expand Up @@ -465,14 +444,9 @@ func flowCollectorControllerSpecs() {
})

It("Should remove kafka config successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.Kafka.Enable = false
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})
})

It("Should deploy single flp again", func() {
Expand All @@ -498,19 +472,14 @@ func flowCollectorControllerSpecs() {

Context("Changing namespace", func() {
It("Should update namespace successfully", func() {
Eventually(func() error {
fc := flowsv1alpha1.FlowCollector{}
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) {
fc.Spec.FlowlogsPipeline.Kind = "Deployment"
fc.Spec.FlowlogsPipeline.Port = 9999
fc.Spec.Namespace = otherNamespace
fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{
Sampling: 200,
}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})
})

It("Should redeploy goglow-kube in new namespace", func() {
Expand Down Expand Up @@ -682,3 +651,27 @@ func flowCollectorControllerSpecs() {
})
})
}

func GetReadyCR(key types.NamespacedName) *flowsv1alpha1.FlowCollector {
cr := flowsv1alpha1.FlowCollector{}
Eventually(func() error {
err := k8sClient.Get(ctx, key, &cr)
if err != nil {
return err
}
cond := meta.FindStatusCondition(cr.Status.Conditions, conditions.TypeReady)
if cond.Status == metav1.ConditionFalse {
return fmt.Errorf("CR is not ready: %s - %v", cond.Reason, cond.Message)
}
return nil
}).Should(Succeed())
return &cr
}

func UpdateCR(key types.NamespacedName, updater func(*flowsv1alpha1.FlowCollector)) {
cr := GetReadyCR(key)
Eventually(func() error {
updater(cr)
return k8sClient.Update(ctx, cr)
}).Should(Succeed())
}
3 changes: 2 additions & 1 deletion controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"path/filepath"
"strconv"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.P

for _, entry := range entries {
fileName := entry.Name()
srcPath := metricsConfigDir + "/" + fileName
srcPath := filepath.Join(metricsConfigDir, fileName)

input, err := metricsConfigEmbed.ReadFile(srcPath)
if err != nil {
Expand Down

0 comments on commit 69c5e3e

Please sign in to comment.