diff --git a/controllers/flowcollector_controller_console_test.go b/controllers/flowcollector_controller_console_test.go index ca55b7e2e..3f313ddac 100644 --- a/controllers/flowcollector_controller_console_test.go +++ b/controllers/flowcollector_controller_console_test.go @@ -179,26 +179,16 @@ func flowCollectorConsolePluginSpecs() { timeout, interval).Should(Equal("http://loki:3100/")) }) It("Should update the Loki URL in the Console Plugin if it changes in the Spec", func() { - Expect(func() error { - upd := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &upd); err != nil { - return err - } - upd.Spec.Loki.URL = "http://loki.namespace:8888" - return k8sClient.Update(ctx, &upd) - }()).Should(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + fc.Spec.Loki.URL = "http://loki.namespace:8888" + }) Eventually(getContainerArgumentAfter("network-observability-plugin", "-loki", cpKey), timeout, interval).Should(Equal("http://loki.namespace:8888")) }) It("Should use the Loki Querier URL instead of the Loki URL, if the first is defined", func() { - Expect(func() error { - upd := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &upd); err != nil { - return err - } - upd.Spec.Loki.QuerierURL = "http://loki-querier:6789" - return k8sClient.Update(ctx, &upd) - }()).Should(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + fc.Spec.Loki.QuerierURL = "http://loki-querier:6789" + }) Eventually(getContainerArgumentAfter("network-observability-plugin", "-loki", cpKey), timeout, interval).Should(Equal("http://loki-querier:6789")) }) @@ -217,10 +207,9 @@ func flowCollectorConsolePluginSpecs() { It("Should be unregistered", func() { By("Update CR to unregister") - fc := flowsv1alpha1.FlowCollector{} - Expect(k8sClient.Get(ctx, crKey, &fc)).Should(Succeed()) - fc.Spec.ConsolePlugin.Register = false - Expect(k8sClient.Update(ctx, &fc)).Should(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + fc.Spec.ConsolePlugin.Register = false + }) By("Expecting the Console CR to not have plugin registered") Eventually(func() interface{} { diff --git a/controllers/flowcollector_controller_ebpf_test.go b/controllers/flowcollector_controller_ebpf_test.go index 2b56ef525..390e79922 100644 --- a/controllers/flowcollector_controller_ebpf_test.go +++ b/controllers/flowcollector_controller_ebpf_test.go @@ -129,12 +129,11 @@ func flowCollectorEBPFSpecs() { }) It("Should update fields that have changed", func() { - updated := flowsv1alpha1.FlowCollector{} - Expect(k8sClient.Get(ctx, crKey, &updated)).Should(Succeed()) - Expect(updated.Spec.EBPF.Sampling).To(Equal(int32(123))) - updated.Spec.EBPF.Sampling = 4 - updated.Spec.EBPF.Privileged = true - Expect(k8sClient.Update(ctx, &updated)).Should(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + Expect(fc.Spec.EBPF.Sampling).To(Equal(int32(123))) + fc.Spec.EBPF.Sampling = 4 + fc.Spec.EBPF.Privileged = true + }) ds := appsv1.DaemonSet{} By("expecting that the daemonset spec has eventually changed") diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index 6dc429860..b41ca0da6 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -12,6 +12,7 @@ import ( ascv2 "k8s.io/api/autoscaling/v2beta2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" @@ -20,6 +21,7 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" . "github.com/netobserv/network-observability-operator/controllers/controllerstest" "github.com/netobserv/network-observability-operator/controllers/flowlogspipeline" + "github.com/netobserv/network-observability-operator/pkg/conditions" "github.com/netobserv/network-observability-operator/pkg/helper" ) @@ -200,16 +202,11 @@ func flowCollectorControllerSpecs() { }) It("Should update successfully", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.IPFIX.CacheActiveTimeout = "30s" fc.Spec.IPFIX.Sampling = 1234 fc.Spec.FlowlogsPipeline.Port = 1999 - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) By("Expecting updated flowlogs-pipeline Service port") Eventually(func() interface{} { @@ -268,14 +265,9 @@ func flowCollectorControllerSpecs() { }) It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.Loki.MaxRetries = 7 - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed") Eventually(func() error { @@ -299,11 +291,10 @@ func flowCollectorControllerSpecs() { Expect(hpa.Spec.MaxReplicas).To(Equal(int32(1))) Expect(*hpa.Spec.Metrics[0].Resource.Target.AverageUtilization).To(Equal(int32(90))) // update FlowCollector and verify that HPA spec also changed - fc := flowsv1alpha1.FlowCollector{} - Expect(k8sClient.Get(ctx, crKey, &fc)).To(Succeed()) - fc.Spec.FlowlogsPipeline.HPA.MinReplicas = pointer.Int32(2) - fc.Spec.FlowlogsPipeline.HPA.MaxReplicas = 2 - Expect(k8sClient.Update(ctx, &fc)).To(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + fc.Spec.FlowlogsPipeline.HPA.MinReplicas = pointer.Int32(2) + fc.Spec.FlowlogsPipeline.HPA.MaxReplicas = 2 + }) By("Changing the Horizontal Pod Autoscaler instance") Eventually(func() error { @@ -324,21 +315,19 @@ func flowCollectorControllerSpecs() { Context("Deploying as DaemonSet", func() { var oldConfigDigest string It("Should update successfully", func() { - fc := flowsv1alpha1.FlowCollector{} - Expect(k8sClient.Get(ctx, crKey, &fc)).Should(Succeed()) - fc.Spec.FlowlogsPipeline = flowsv1alpha1.FlowCollectorFLP{ - Kind: "DaemonSet", - Port: 7891, - ImagePullPolicy: "Never", - LogLevel: "error", - Image: "testimg:latest", - } - fc.Spec.Loki = flowsv1alpha1.FlowCollectorLoki{} - fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{ - Sampling: 200, - } - // Update - Expect(k8sClient.Update(ctx, &fc)).Should(Succeed()) + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { + fc.Spec.FlowlogsPipeline = flowsv1alpha1.FlowCollectorFLP{ + Kind: "DaemonSet", + Port: 7891, + ImagePullPolicy: "Never", + LogLevel: "error", + Image: "testimg:latest", + } + fc.Spec.Loki = flowsv1alpha1.FlowCollectorLoki{} + fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{ + Sampling: 200, + } + }) By("Expecting to create the ovn-flows-configmap with the configuration from the FlowCollector") Eventually(func() interface{} { @@ -392,14 +381,9 @@ func flowCollectorControllerSpecs() { }) }) It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.Loki.MaxRetries = 7 - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed") Eventually(func() error { @@ -419,11 +403,7 @@ func flowCollectorControllerSpecs() { Context("Changing kafka config", func() { It("Should update kafka config successfully", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.Kafka = flowsv1alpha1.FlowCollectorKafka{ Enable: true, Address: "localhost:9092", @@ -436,8 +416,7 @@ func flowCollectorControllerSpecs() { }, }, } - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) }) It("Should deploy kafka ingester and transformer", func() { @@ -465,14 +444,9 @@ func flowCollectorControllerSpecs() { }) It("Should remove kafka config successfully", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.Kafka.Enable = false - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) }) It("Should deploy single flp again", func() { @@ -498,19 +472,14 @@ func flowCollectorControllerSpecs() { Context("Changing namespace", func() { It("Should update namespace successfully", func() { - Eventually(func() error { - fc := flowsv1alpha1.FlowCollector{} - if err := k8sClient.Get(ctx, crKey, &fc); err != nil { - return err - } + UpdateCR(crKey, func(fc *flowsv1alpha1.FlowCollector) { fc.Spec.FlowlogsPipeline.Kind = "Deployment" fc.Spec.FlowlogsPipeline.Port = 9999 fc.Spec.Namespace = otherNamespace fc.Spec.IPFIX = flowsv1alpha1.FlowCollectorIPFIX{ Sampling: 200, } - return k8sClient.Update(ctx, &fc) - }).Should(Succeed()) + }) }) It("Should redeploy goglow-kube in new namespace", func() { @@ -682,3 +651,27 @@ func flowCollectorControllerSpecs() { }) }) } + +func GetReadyCR(key types.NamespacedName) *flowsv1alpha1.FlowCollector { + cr := flowsv1alpha1.FlowCollector{} + Eventually(func() error { + err := k8sClient.Get(ctx, key, &cr) + if err != nil { + return err + } + cond := meta.FindStatusCondition(cr.Status.Conditions, conditions.TypeReady) + if cond.Status == metav1.ConditionFalse { + return fmt.Errorf("CR is not ready: %s - %v", cond.Reason, cond.Message) + } + return nil + }).Should(Succeed()) + return &cr +} + +func UpdateCR(key types.NamespacedName, updater func(*flowsv1alpha1.FlowCollector)) { + cr := GetReadyCR(key) + Eventually(func() error { + updater(cr) + return k8sClient.Update(ctx, cr) + }).Should(Succeed()) +} diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go index 2a884ae2e..e077b3d01 100644 --- a/controllers/flowlogspipeline/flp_objects.go +++ b/controllers/flowlogspipeline/flp_objects.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "hash/fnv" + "path/filepath" "strconv" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -253,7 +254,7 @@ func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.P for _, entry := range entries { fileName := entry.Name() - srcPath := metricsConfigDir + "/" + fileName + srcPath := filepath.Join(metricsConfigDir, fileName) input, err := metricsConfigEmbed.ReadFile(srcPath) if err != nil {