From eeae52948cfa864434bf9d77f09d33bc9b73e08b Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Tue, 5 Apr 2022 15:48:22 +0200 Subject: [PATCH] Added kafka config in the CR --- api/v1alpha1/flowcollector_types.go | 17 ++++ api/v1alpha1/zz_generated.deepcopy.go | 20 +++++ .../flows.netobserv.io_flowcollectors.yaml | 16 ++++ controllers/flowcollector_controller.go | 2 +- controllers/flowcollector_controller_test.go | 81 +++++++++++++++++ controllers/flowlogspipeline/flp_objects.go | 12 +-- .../flowlogspipeline/flp_reconciler.go | 88 ++++++++++++++----- controllers/flowlogspipeline/flp_test.go | 58 +++++++++--- controllers/ovs/flowsconfig_reconciler.go | 8 +- .../reconcilers/namespaced_objects_manager.go | 2 +- docs/FlowCollector.md | 45 ++++++++++ 11 files changed, 301 insertions(+), 48 deletions(-) diff --git a/api/v1alpha1/flowcollector_types.go b/api/v1alpha1/flowcollector_types.go index c635f96e8..d6e0faf2e 100644 --- a/api/v1alpha1/flowcollector_types.go +++ b/api/v1alpha1/flowcollector_types.go @@ -68,6 +68,19 @@ type FlowCollectorIPFIX struct { Sampling int32 `json:"sampling,omitempty" mapstructure:"sampling,omitempty"` } +// FlowCollectorKafka defines the desired Kafka config of FlowCollector +type FlowCollectorKafka struct { + // Important: Run "make generate" to regenerate code after modifying this file + + //+kubebuilder:default:="" + // Address of the kafka server + Address string `json:"address"` + + //+kubebuilder:default:="" + // Address of the kafka topic to use + Topic string `json:"topic"` +} + // FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector type FlowCollectorFLP struct { // Important: Run "make generate" to regenerate code after modifying this file @@ -128,6 +141,10 @@ type FlowCollectorFLP struct { //+kubebuilder:default:=false // PrintOutput is a debug flag to print flows exported in flowlogs-pipeline stdout PrintOutput bool `json:"printOutput,omitempty"` + + // Kafka configurations, if empty the operator will deploy a all-in-one FLP + // +optional + Kafka *FlowCollectorKafka `json:"kafka,omitempty"` } type FlowCollectorHPA struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b52687824..ce9048237 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -98,6 +98,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) { (*in).DeepCopyInto(*out) } in.Resources.DeepCopyInto(&out.Resources) + if in.Kafka != nil { + in, out := &in.Kafka, &out.Kafka + *out = new(FlowCollectorKafka) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorFLP. @@ -152,6 +157,21 @@ func (in *FlowCollectorIPFIX) DeepCopy() *FlowCollectorIPFIX { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowCollectorKafka) DeepCopyInto(out *FlowCollectorKafka) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorKafka. +func (in *FlowCollectorKafka) DeepCopy() *FlowCollectorKafka { + if in == nil { + return nil + } + out := new(FlowCollectorKafka) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlowCollectorList) DeepCopyInto(out *FlowCollectorList) { *out = *in diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index ea35c466f..2e1d48888 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -1180,6 +1180,22 @@ spec: - Always - Never type: string + kafka: + description: Kafka configurations, if empty the operator will + deploy a all-in-one FLP + properties: + address: + default: "" + description: Address of the kafka server + type: string + topic: + default: "" + description: Address of the kafka topic to use + type: string + required: + - address + - topic + type: object kind: default: DaemonSet description: Kind is the workload kind, either DaemonSet or Deployment diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go index b4d9ce659..a83469393 100644 --- a/controllers/flowcollector_controller.go +++ b/controllers/flowcollector_controller.go @@ -136,7 +136,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques } // OVS config map for CNO - if err := ovsConfigController.Reconcile(ctx, desired); err != nil { + if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(&desired.Spec.FlowlogsPipeline)); err != nil { log.Error(err, "Failed to reconcile ovs-flows-config ConfigMap") } diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index 557044847..3e9700555 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -44,6 +44,14 @@ var _ = Describe("FlowCollector Controller", func() { Name: constants.FLPName, Namespace: otherNamespace, } + gfKeyKafkaIngestor := types.NamespacedName{ + Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngestor], + Namespace: operatorNamespace, + } + gfKeyKafkaTransformer := types.NamespacedName{ + Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer], + Namespace: operatorNamespace, + } cpKey1 := types.NamespacedName{ Name: "network-observability-plugin", Namespace: operatorNamespace, @@ -437,6 +445,79 @@ var _ = Describe("FlowCollector Controller", func() { }) }) + Context("Changing kafka config", func() { + It("Should update kafka config successfully", func() { + Eventually(func() error { + fc := flowsv1alpha1.FlowCollector{} + if err := k8sClient.Get(ctx, crKey, &fc); err != nil { + return err + } + fc.Spec.FlowlogsPipeline.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"} + return k8sClient.Update(ctx, &fc) + }).Should(Succeed()) + }) + + It("Should deploy kafka ingestor and transformer", func() { + By("Expecting ingestor daemonset to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{}) + }, timeout, interval).Should(Succeed()) + + By("Expecting transformer deployment to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{}) + }, timeout, interval).Should(Succeed()) + + By("Not Expecting transformer service to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{}) + }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-ktransform" not found`)) + }) + + It("Should delete previous flp deployment", func() { + By("Expecting deployment to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{}) + }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`)) + + By("Expecting service to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKey1, &v1.Service{}) + }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline" not found`)) + }) + + It("Should remove kafka config successfully", func() { + Eventually(func() error { + fc := flowsv1alpha1.FlowCollector{} + if err := k8sClient.Get(ctx, crKey, &fc); err != nil { + return err + } + fc.Spec.FlowlogsPipeline.Kafka = nil + return k8sClient.Update(ctx, &fc) + }).Should(Succeed()) + }) + + It("Should deploy single flp again", func() { + By("Expecting daemonset to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{}) + }, timeout, interval).Should(Succeed()) + }) + + It("Should delete kafka ingestor and transformer", func() { + By("Expecting ingestor daemonset to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{}) + }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-kingestor" not found`)) + + By("Expecting transformer deployment to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{}) + }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-ktransform" not found`)) + }) + + }) + Context("Changing namespace", func() { It("Should update namespace successfully", func() { Eventually(func() error { diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go index 508c88dee..c4f1054b1 100644 --- a/controllers/flowlogspipeline/flp_objects.go +++ b/controllers/flowlogspipeline/flp_objects.go @@ -37,10 +37,10 @@ const ( ConfKafkaTransformer = "kafkaTransformer" ) -var flpConfSuffix = map[string]string{ +var FlpConfSuffix = map[string]string{ ConfSingle: "", ConfKafkaIngestor: "-kingestor", - ConfKafkaTransformer: "-ktransformer", + ConfKafkaTransformer: "-ktransform", } // PodConfigurationDigest is an annotation name to facilitate pod restart after @@ -61,15 +61,15 @@ func newBuilder(ns string, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki return builder{ namespace: ns, labels: map[string]string{ - "app": constants.FLPName + flpConfSuffix[confKind], + "app": constants.FLPName + FlpConfSuffix[confKind], "version": version, }, selector: map[string]string{ - "app": constants.FLPName + flpConfSuffix[confKind], + "app": constants.FLPName + FlpConfSuffix[confKind], }, desired: desired, desiredLoki: desiredLoki, - confKindSuffix: flpConfSuffix[confKind], + confKindSuffix: FlpConfSuffix[confKind], } } @@ -349,7 +349,7 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler { func buildAppLabel(confKind string) map[string]string { return map[string]string{ - "app": constants.FLPName + flpConfSuffix[confKind], + "app": constants.FLPName + FlpConfSuffix[confKind], } } diff --git a/controllers/flowlogspipeline/flp_reconciler.go b/controllers/flowlogspipeline/flp_reconciler.go index 815e1b5e0..a961d0fc7 100644 --- a/controllers/flowlogspipeline/flp_reconciler.go +++ b/controllers/flowlogspipeline/flp_reconciler.go @@ -51,6 +51,8 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler flpReconciler := FLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned} flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfSingle)) + flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfKafkaIngestor)) + flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfKafkaTransformer)) return flpReconciler } @@ -64,12 +66,11 @@ func newSingleReconciler(cl reconcilers.ClientHelper, ns string, prevNS string, configMap: &corev1.ConfigMap{}, } nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS) - nobjMngr.AddManagedObject(constants.FLPName, owned.deployment) - nobjMngr.AddManagedObject(constants.FLPName, owned.daemonSet) - nobjMngr.AddManagedObject(constants.FLPName, owned.service) - nobjMngr.AddManagedObject(constants.FLPName, owned.hpa) - nobjMngr.AddManagedObject(constants.FLPName, owned.serviceAccount) - nobjMngr.AddManagedObject(configMapName, owned.configMap) + nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.deployment) + nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.daemonSet) + nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.service) + nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.hpa) + nobjMngr.AddManagedObject(configMapName+FlpConfSuffix[confKind], owned.configMap) return singleFLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, confKind: confKind} } @@ -99,6 +100,13 @@ func validateDesired(desired *flpSpec) error { return nil } +func (r *FLPReconciler) GetServiceName(desiredFLP *flpSpec) string { + if single, _ := checkDeployNeeded(desiredFLP, ConfKafkaIngestor); single { + return constants.FLPName + FlpConfSuffix[ConfKafkaIngestor] + } + return constants.FLPName + FlpConfSuffix[ConfSingle] +} + func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error { for _, singleFlp := range r.singleReconcilers { err := singleFlp.Reconcile(ctx, desiredFLP, desiredLoki) @@ -109,6 +117,21 @@ func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desi return nil } +// Check if a configKind should be deployed +func checkDeployNeeded(desiredFLP *flpSpec, confKind string) (bool, error) { + switch confKind { + case ConfSingle: + return desiredFLP.Kafka == nil, nil + case ConfKafkaTransformer: + return desiredFLP.Kafka != nil, nil + case ConfKafkaIngestor: + //TODO should be disabled if ebpf-agent is enabled with kafka + return desiredFLP.Kafka != nil, nil + default: + return false, fmt.Errorf("unknown flowlogs-pipelines config kind") + } +} + // Reconcile is the reconciler entry point to reconcile the current flowlogs-pipeline state with the desired configuration func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error { err := validateDesired(desiredFLP) @@ -116,6 +139,15 @@ func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec return err } + shouldDeploy, err := checkDeployNeeded(desiredFLP, r.confKind) + if err != nil { + return err + } + if !shouldDeploy { + r.nobjMngr.CleanupNamespace(ctx) + return nil + } + builder := newBuilder(r.nobjMngr.Namespace, desiredFLP, desiredLoki, r.confKind) // Retrieve current owned objects err = r.nobjMngr.FetchAll(ctx) @@ -137,6 +169,9 @@ func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec case constants.DeploymentKind: return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest) case constants.DaemonSetKind: + if r.confKind == ConfKafkaTransformer { + return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest) + } return r.reconcileAsDaemonSet(ctx, desiredFLP, &builder, configDigest) default: return fmt.Errorf("could not reconcile collector, invalid kind: %s", desiredFLP.Kind) @@ -152,19 +187,13 @@ func (r *singleFLPReconciler) reconcileAsDeployment(ctx context.Context, desired if err := r.CreateOwned(ctx, builder.deployment(configDigest)); err != nil { return err } - } else if deploymentNeedsUpdate(r.owned.deployment, desiredFLP, configDigest) { + } else if deploymentNeedsUpdate(r.owned.deployment, desiredFLP, configDigest, constants.FLPName+FlpConfSuffix[r.confKind]) { if err := r.UpdateOwned(ctx, r.owned.deployment, builder.deployment(configDigest)); err != nil { return err } } - if !r.nobjMngr.Exists(r.owned.service) { - newSVC := builder.service(nil) - if err := r.CreateOwned(ctx, newSVC); err != nil { - return err - } - } else if serviceNeedsUpdate(r.owned.service, desiredFLP) { - newSVC := builder.service(r.owned.service) - if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil { + if r.confKind != ConfKafkaTransformer { + if err := r.reconcileAsService(ctx, desiredFLP, builder); err != nil { return err } } @@ -187,6 +216,21 @@ func (r *singleFLPReconciler) reconcileAsDeployment(ctx context.Context, desired return nil } +func (r *singleFLPReconciler) reconcileAsService(ctx context.Context, desiredFLP *flpSpec, builder *builder) error { + if !r.nobjMngr.Exists(r.owned.service) { + newSVC := builder.service(nil) + if err := r.CreateOwned(ctx, newSVC); err != nil { + return err + } + } else if serviceNeedsUpdate(r.owned.service, desiredFLP) { + newSVC := builder.service(r.owned.service) + if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil { + return err + } + } + return nil +} + func (r *singleFLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error { // Kind may have changed: try delete Deployment / Service / HPA and create DaemonSet r.nobjMngr.TryDelete(ctx, r.owned.deployment) @@ -194,7 +238,7 @@ func (r *singleFLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredF r.nobjMngr.TryDelete(ctx, r.owned.hpa) if !r.nobjMngr.Exists(r.owned.daemonSet) { return r.CreateOwned(ctx, builder.daemonSet(configDigest)) - } else if daemonSetNeedsUpdate(r.owned.daemonSet, desiredFLP, configDigest) { + } else if daemonSetNeedsUpdate(r.owned.daemonSet, desiredFLP, configDigest, constants.FLPName+FlpConfSuffix[r.confKind]) { return r.UpdateOwned(ctx, r.owned.daemonSet, builder.daemonSet(configDigest)) } return nil @@ -224,13 +268,13 @@ func (r *FLPReconciler) createPermissions(ctx context.Context, firstInstall bool return nil } -func daemonSetNeedsUpdate(ds *appsv1.DaemonSet, desired *flpSpec, configDigest string) bool { - return containerNeedsUpdate(&ds.Spec.Template.Spec, desired, true) || +func daemonSetNeedsUpdate(ds *appsv1.DaemonSet, desired *flpSpec, configDigest string, name string) bool { + return containerNeedsUpdate(&ds.Spec.Template.Spec, desired, true, name) || configChanged(&ds.Spec.Template, configDigest) } -func deploymentNeedsUpdate(depl *appsv1.Deployment, desired *flpSpec, configDigest string) bool { - return containerNeedsUpdate(&depl.Spec.Template.Spec, desired, false) || +func deploymentNeedsUpdate(depl *appsv1.Deployment, desired *flpSpec, configDigest string, name string) bool { + return containerNeedsUpdate(&depl.Spec.Template.Spec, desired, false, name) || configChanged(&depl.Spec.Template, configDigest) || (desired.HPA == nil && *depl.Spec.Replicas != desired.Replicas) } @@ -248,10 +292,10 @@ func serviceNeedsUpdate(svc *corev1.Service, desired *flpSpec) bool { return true } -func containerNeedsUpdate(podSpec *corev1.PodSpec, desired *flpSpec, expectHostPort bool) bool { +func containerNeedsUpdate(podSpec *corev1.PodSpec, desired *flpSpec, expectHostPort bool, name string) bool { // Note, we don't check for changed port / host port here, because that would change also the configmap, // which also triggers pod update anyway - container := reconcilers.FindContainer(podSpec, constants.FLPName) + container := reconcilers.FindContainer(podSpec, name) return container == nil || desired.Image != container.Image || desired.ImagePullPolicy != string(container.ImagePullPolicy) || diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go index 6182bb88b..1ec8c4b0b 100644 --- a/controllers/flowlogspipeline/flp_test.go +++ b/controllers/flowlogspipeline/flp_test.go @@ -148,7 +148,7 @@ func TestDaemonSetNoChange(t *testing.T) { b = newBuilder(ns, &flp, &loki, ConfSingle) _, digest = b.configMap() - assert.False(daemonSetNeedsUpdate(first, &flp, digest)) + assert.False(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } func TestDaemonSetChanged(t *testing.T) { @@ -168,7 +168,7 @@ func TestDaemonSetChanged(t *testing.T) { _, digest = b.configMap() second := b.daemonSet(digest) - assert.True(daemonSetNeedsUpdate(first, &flp, digest)) + assert.True(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check log level change flp.LogLevel = "info" @@ -176,7 +176,7 @@ func TestDaemonSetChanged(t *testing.T) { _, digest = b.configMap() third := b.daemonSet(digest) - assert.True(daemonSetNeedsUpdate(second, &flp, digest)) + assert.True(daemonSetNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check resource change flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{ @@ -187,7 +187,7 @@ func TestDaemonSetChanged(t *testing.T) { _, digest = b.configMap() fourth := b.daemonSet(digest) - assert.True(daemonSetNeedsUpdate(third, &flp, digest)) + assert.True(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check reverting limits flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{ @@ -197,8 +197,8 @@ func TestDaemonSetChanged(t *testing.T) { b = newBuilder(ns, &flp, &loki, ConfSingle) _, digest = b.configMap() - assert.True(daemonSetNeedsUpdate(fourth, &flp, digest)) - assert.False(daemonSetNeedsUpdate(third, &flp, digest)) + assert.True(daemonSetNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) + assert.False(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } func TestDeploymentNoChange(t *testing.T) { @@ -218,7 +218,7 @@ func TestDeploymentNoChange(t *testing.T) { b = newBuilder(ns, &flp, &loki, ConfSingle) _, digest = b.configMap() - assert.False(deploymentNeedsUpdate(first, &flp, digest)) + assert.False(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } func TestDeploymentChanged(t *testing.T) { @@ -238,7 +238,7 @@ func TestDeploymentChanged(t *testing.T) { _, digest = b.configMap() second := b.deployment(digest) - assert.True(deploymentNeedsUpdate(first, &flp, digest)) + assert.True(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check log level change flp.LogLevel = "info" @@ -246,7 +246,7 @@ func TestDeploymentChanged(t *testing.T) { _, digest = b.configMap() third := b.deployment(digest) - assert.True(deploymentNeedsUpdate(second, &flp, digest)) + assert.True(deploymentNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check resource change flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{ @@ -257,7 +257,7 @@ func TestDeploymentChanged(t *testing.T) { _, digest = b.configMap() fourth := b.deployment(digest) - assert.True(deploymentNeedsUpdate(third, &flp, digest)) + assert.True(deploymentNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check reverting limits flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{ @@ -268,8 +268,8 @@ func TestDeploymentChanged(t *testing.T) { _, digest = b.configMap() fifth := b.deployment(digest) - assert.True(deploymentNeedsUpdate(fourth, &flp, digest)) - assert.False(deploymentNeedsUpdate(third, &flp, digest)) + assert.True(deploymentNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) + assert.False(deploymentNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) // Check replicas didn't change because HPA is used flp2 := flp @@ -277,7 +277,7 @@ func TestDeploymentChanged(t *testing.T) { b = newBuilder(ns, &flp2, &loki, ConfSingle) _, digest = b.configMap() - assert.False(deploymentNeedsUpdate(fifth, &flp2, digest)) + assert.False(deploymentNeedsUpdate(fifth, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } func TestDeploymentChangedReplicasNoHPA(t *testing.T) { @@ -297,7 +297,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { b = newBuilder(ns, &flp2, &loki, ConfSingle) _, digest = b.configMap() - assert.True(deploymentNeedsUpdate(first, &flp2, digest)) + assert.True(deploymentNeedsUpdate(first, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } func TestServiceNoChange(t *testing.T) { @@ -434,3 +434,33 @@ func TestLabels(t *testing.T) { assert.Equal("dev", svc.Labels["version"]) assert.Empty(svc.Spec.Selector["version"]) } + +func TestDeployNeeded(t *testing.T) { + assert := assert.New(t) + + flp := getFLPConfig() + + // Kafka not configured + res, err := checkDeployNeeded(&flp, ConfSingle) + assert.True(res) + assert.NoError(err) + res, err = checkDeployNeeded(&flp, ConfKafkaIngestor) + assert.False(res) + assert.NoError(err) + res, err = checkDeployNeeded(&flp, ConfKafkaTransformer) + assert.False(res) + assert.NoError(err) + + // Kafka not configured + flp.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"} + res, err = checkDeployNeeded(&flp, ConfSingle) + assert.False(res) + assert.NoError(err) + res, err = checkDeployNeeded(&flp, ConfKafkaIngestor) + assert.True(res) + assert.NoError(err) + res, err = checkDeployNeeded(&flp, ConfKafkaTransformer) + assert.True(res) + assert.NoError(err) + +} diff --git a/controllers/ovs/flowsconfig_reconciler.go b/controllers/ovs/flowsconfig_reconciler.go index e36599632..2aac77318 100644 --- a/controllers/ovs/flowsconfig_reconciler.go +++ b/controllers/ovs/flowsconfig_reconciler.go @@ -40,14 +40,14 @@ func NewFlowsConfigController(client reconcilers.ClientHelper, // Reconcile reconciles the status of the ovs-flows-config configmap with // the target FlowCollector ipfix section map func (c *FlowsConfigController) Reconcile( - ctx context.Context, target *flowsv1alpha1.FlowCollector) error { + ctx context.Context, target *flowsv1alpha1.FlowCollector, flpServiceName string) error { rlog := log.FromContext(ctx, "component", "FlowsConfigController") current, err := c.current(ctx) if err != nil { return err } - desired, err := c.desired(ctx, target) + desired, err := c.desired(ctx, target, flpServiceName) // compare current and desired if err != nil { return err @@ -95,7 +95,7 @@ func (c *FlowsConfigController) current(ctx context.Context) (*flowsConfig, erro } func (c *FlowsConfigController) desired( - ctx context.Context, coll *flowsv1alpha1.FlowCollector) (*flowsConfig, error) { + ctx context.Context, coll *flowsv1alpha1.FlowCollector, flpServiceName string) (*flowsConfig, error) { conf := flowsConfig{FlowCollectorIPFIX: coll.Spec.IPFIX} @@ -110,7 +110,7 @@ func (c *FlowsConfigController) desired( svc := corev1.Service{} if err := c.client.Get(ctx, types.NamespacedName{ Namespace: c.collectorNamespace, - Name: constants.FLPName, + Name: flpServiceName, }, &svc); err != nil { return nil, fmt.Errorf("can't get service %s in %s: %w", constants.FLPName, c.collectorNamespace, err) } diff --git a/controllers/reconcilers/namespaced_objects_manager.go b/controllers/reconcilers/namespaced_objects_manager.go index 55ac98f80..f3ad8cac9 100644 --- a/controllers/reconcilers/namespaced_objects_manager.go +++ b/controllers/reconcilers/namespaced_objects_manager.go @@ -75,7 +75,7 @@ func (m *NamespacedObjectManager) CleanupNamespace(ctx context.Context) { ref.SetNamespace(namespace) log.Info("Deleting old "+obj.kind, "Namespace", namespace, "Name", obj.name) err := m.client.Delete(ctx, ref) - if err != nil { + if client.IgnoreNotFound(err) != nil { log.Error(err, "Failed to delete old "+obj.kind, "Namespace", namespace, "Name", obj.name) } } diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 026bca114..63fd430a4 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -1244,6 +1244,13 @@ FlowlogsPipeline contains settings related to the flowlogs-pipeline component Default: IfNotPresent
false + + kafka + object + + Kafka configurations, if empty the operator will deploy a all-in-one FLP
+ + false kind enum @@ -2218,6 +2225,44 @@ target specifies the target value for the given metric +### FlowCollector.spec.flowlogsPipeline.kafka +[↩ Parent](#flowcollectorspecflowlogspipeline) + + + +Kafka configurations, if empty the operator will deploy a all-in-one FLP + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
addressstring + Address of the kafka server
+
+ Default:
+
true
topicstring + Address of the kafka topic to use
+
+ Default:
+
true
+ + ### FlowCollector.spec.flowlogsPipeline.resources [↩ Parent](#flowcollectorspecflowlogspipeline)