diff --git a/api/v1alpha1/flowcollector_types.go b/api/v1alpha1/flowcollector_types.go
index c635f96e8..d6e0faf2e 100644
--- a/api/v1alpha1/flowcollector_types.go
+++ b/api/v1alpha1/flowcollector_types.go
@@ -68,6 +68,19 @@ type FlowCollectorIPFIX struct {
Sampling int32 `json:"sampling,omitempty" mapstructure:"sampling,omitempty"`
}
+// FlowCollectorKafka defines the desired Kafka config of FlowCollector
+type FlowCollectorKafka struct {
+ // Important: Run "make generate" to regenerate code after modifying this file
+
+ //+kubebuilder:default:=""
+ // Address of the kafka server
+ Address string `json:"address"`
+
+ //+kubebuilder:default:=""
+ // Address of the kafka topic to use
+ Topic string `json:"topic"`
+}
+
// FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector
type FlowCollectorFLP struct {
// Important: Run "make generate" to regenerate code after modifying this file
@@ -128,6 +141,10 @@ type FlowCollectorFLP struct {
//+kubebuilder:default:=false
// PrintOutput is a debug flag to print flows exported in flowlogs-pipeline stdout
PrintOutput bool `json:"printOutput,omitempty"`
+
+ // Kafka configurations, if empty the operator will deploy a all-in-one FLP
+ // +optional
+ Kafka *FlowCollectorKafka `json:"kafka,omitempty"`
}
type FlowCollectorHPA struct {
diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go
index b52687824..ce9048237 100644
--- a/api/v1alpha1/zz_generated.deepcopy.go
+++ b/api/v1alpha1/zz_generated.deepcopy.go
@@ -98,6 +98,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
(*in).DeepCopyInto(*out)
}
in.Resources.DeepCopyInto(&out.Resources)
+ if in.Kafka != nil {
+ in, out := &in.Kafka, &out.Kafka
+ *out = new(FlowCollectorKafka)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorFLP.
@@ -152,6 +157,21 @@ func (in *FlowCollectorIPFIX) DeepCopy() *FlowCollectorIPFIX {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FlowCollectorKafka) DeepCopyInto(out *FlowCollectorKafka) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCollectorKafka.
+func (in *FlowCollectorKafka) DeepCopy() *FlowCollectorKafka {
+ if in == nil {
+ return nil
+ }
+ out := new(FlowCollectorKafka)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlowCollectorList) DeepCopyInto(out *FlowCollectorList) {
*out = *in
diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
index ea35c466f..2e1d48888 100644
--- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
+++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
@@ -1180,6 +1180,22 @@ spec:
- Always
- Never
type: string
+ kafka:
+ description: Kafka configurations, if empty the operator will
+ deploy a all-in-one FLP
+ properties:
+ address:
+ default: ""
+ description: Address of the kafka server
+ type: string
+ topic:
+ default: ""
+ description: Address of the kafka topic to use
+ type: string
+ required:
+ - address
+ - topic
+ type: object
kind:
default: DaemonSet
description: Kind is the workload kind, either DaemonSet or Deployment
diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go
index b4d9ce659..a83469393 100644
--- a/controllers/flowcollector_controller.go
+++ b/controllers/flowcollector_controller.go
@@ -136,7 +136,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
// OVS config map for CNO
- if err := ovsConfigController.Reconcile(ctx, desired); err != nil {
+ if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(&desired.Spec.FlowlogsPipeline)); err != nil {
log.Error(err, "Failed to reconcile ovs-flows-config ConfigMap")
}
diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go
index 557044847..3e9700555 100644
--- a/controllers/flowcollector_controller_test.go
+++ b/controllers/flowcollector_controller_test.go
@@ -44,6 +44,14 @@ var _ = Describe("FlowCollector Controller", func() {
Name: constants.FLPName,
Namespace: otherNamespace,
}
+ gfKeyKafkaIngestor := types.NamespacedName{
+ Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngestor],
+ Namespace: operatorNamespace,
+ }
+ gfKeyKafkaTransformer := types.NamespacedName{
+ Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer],
+ Namespace: operatorNamespace,
+ }
cpKey1 := types.NamespacedName{
Name: "network-observability-plugin",
Namespace: operatorNamespace,
@@ -437,6 +445,79 @@ var _ = Describe("FlowCollector Controller", func() {
})
})
+ Context("Changing kafka config", func() {
+ It("Should update kafka config successfully", func() {
+ Eventually(func() error {
+ fc := flowsv1alpha1.FlowCollector{}
+ if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
+ return err
+ }
+ fc.Spec.FlowlogsPipeline.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
+ return k8sClient.Update(ctx, &fc)
+ }).Should(Succeed())
+ })
+
+ It("Should deploy kafka ingestor and transformer", func() {
+ By("Expecting ingestor daemonset to be created")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{})
+ }, timeout, interval).Should(Succeed())
+
+ By("Expecting transformer deployment to be created")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
+ }, timeout, interval).Should(Succeed())
+
+ By("Not Expecting transformer service to be created")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{})
+ }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-ktransform" not found`))
+ })
+
+ It("Should delete previous flp deployment", func() {
+ By("Expecting deployment to be deleted")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
+ }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))
+
+ By("Expecting service to be deleted")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKey1, &v1.Service{})
+ }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline" not found`))
+ })
+
+ It("Should remove kafka config successfully", func() {
+ Eventually(func() error {
+ fc := flowsv1alpha1.FlowCollector{}
+ if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
+ return err
+ }
+ fc.Spec.FlowlogsPipeline.Kafka = nil
+ return k8sClient.Update(ctx, &fc)
+ }).Should(Succeed())
+ })
+
+ It("Should deploy single flp again", func() {
+ By("Expecting daemonset to be created")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
+ }, timeout, interval).Should(Succeed())
+ })
+
+ It("Should delete kafka ingestor and transformer", func() {
+ By("Expecting ingestor daemonset to be deleted")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKeyKafkaIngestor, &appsv1.DaemonSet{})
+ }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-kingestor" not found`))
+
+ By("Expecting transformer deployment to be deleted")
+ Eventually(func() interface{} {
+ return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
+ }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-ktransform" not found`))
+ })
+
+ })
+
Context("Changing namespace", func() {
It("Should update namespace successfully", func() {
Eventually(func() error {
diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go
index 508c88dee..c4f1054b1 100644
--- a/controllers/flowlogspipeline/flp_objects.go
+++ b/controllers/flowlogspipeline/flp_objects.go
@@ -37,10 +37,10 @@ const (
ConfKafkaTransformer = "kafkaTransformer"
)
-var flpConfSuffix = map[string]string{
+var FlpConfSuffix = map[string]string{
ConfSingle: "",
ConfKafkaIngestor: "-kingestor",
- ConfKafkaTransformer: "-ktransformer",
+ ConfKafkaTransformer: "-ktransform",
}
// PodConfigurationDigest is an annotation name to facilitate pod restart after
@@ -61,15 +61,15 @@ func newBuilder(ns string, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki
return builder{
namespace: ns,
labels: map[string]string{
- "app": constants.FLPName + flpConfSuffix[confKind],
+ "app": constants.FLPName + FlpConfSuffix[confKind],
"version": version,
},
selector: map[string]string{
- "app": constants.FLPName + flpConfSuffix[confKind],
+ "app": constants.FLPName + FlpConfSuffix[confKind],
},
desired: desired,
desiredLoki: desiredLoki,
- confKindSuffix: flpConfSuffix[confKind],
+ confKindSuffix: FlpConfSuffix[confKind],
}
}
@@ -349,7 +349,7 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler {
func buildAppLabel(confKind string) map[string]string {
return map[string]string{
- "app": constants.FLPName + flpConfSuffix[confKind],
+ "app": constants.FLPName + FlpConfSuffix[confKind],
}
}
diff --git a/controllers/flowlogspipeline/flp_reconciler.go b/controllers/flowlogspipeline/flp_reconciler.go
index 815e1b5e0..a961d0fc7 100644
--- a/controllers/flowlogspipeline/flp_reconciler.go
+++ b/controllers/flowlogspipeline/flp_reconciler.go
@@ -51,6 +51,8 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler
flpReconciler := FLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned}
flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfSingle))
+ flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfKafkaIngestor))
+ flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfKafkaTransformer))
return flpReconciler
}
@@ -64,12 +66,11 @@ func newSingleReconciler(cl reconcilers.ClientHelper, ns string, prevNS string,
configMap: &corev1.ConfigMap{},
}
nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
- nobjMngr.AddManagedObject(constants.FLPName, owned.deployment)
- nobjMngr.AddManagedObject(constants.FLPName, owned.daemonSet)
- nobjMngr.AddManagedObject(constants.FLPName, owned.service)
- nobjMngr.AddManagedObject(constants.FLPName, owned.hpa)
- nobjMngr.AddManagedObject(constants.FLPName, owned.serviceAccount)
- nobjMngr.AddManagedObject(configMapName, owned.configMap)
+ nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.deployment)
+ nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.daemonSet)
+ nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.service)
+ nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.hpa)
+ nobjMngr.AddManagedObject(configMapName+FlpConfSuffix[confKind], owned.configMap)
return singleFLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, confKind: confKind}
}
@@ -99,6 +100,13 @@ func validateDesired(desired *flpSpec) error {
return nil
}
+func (r *FLPReconciler) GetServiceName(desiredFLP *flpSpec) string {
+ if single, _ := checkDeployNeeded(desiredFLP, ConfKafkaIngestor); single {
+ return constants.FLPName + FlpConfSuffix[ConfKafkaIngestor]
+ }
+ return constants.FLPName + FlpConfSuffix[ConfSingle]
+}
+
func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error {
for _, singleFlp := range r.singleReconcilers {
err := singleFlp.Reconcile(ctx, desiredFLP, desiredLoki)
@@ -109,6 +117,21 @@ func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desi
return nil
}
+// Check if a configKind should be deployed
+func checkDeployNeeded(desiredFLP *flpSpec, confKind string) (bool, error) {
+ switch confKind {
+ case ConfSingle:
+ return desiredFLP.Kafka == nil, nil
+ case ConfKafkaTransformer:
+ return desiredFLP.Kafka != nil, nil
+ case ConfKafkaIngestor:
+ //TODO should be disabled if ebpf-agent is enabled with kafka
+ return desiredFLP.Kafka != nil, nil
+ default:
+ return false, fmt.Errorf("unknown flowlogs-pipelines config kind")
+ }
+}
+
// Reconcile is the reconciler entry point to reconcile the current flowlogs-pipeline state with the desired configuration
func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error {
err := validateDesired(desiredFLP)
@@ -116,6 +139,15 @@ func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec
return err
}
+ shouldDeploy, err := checkDeployNeeded(desiredFLP, r.confKind)
+ if err != nil {
+ return err
+ }
+ if !shouldDeploy {
+ r.nobjMngr.CleanupNamespace(ctx)
+ return nil
+ }
+
builder := newBuilder(r.nobjMngr.Namespace, desiredFLP, desiredLoki, r.confKind)
// Retrieve current owned objects
err = r.nobjMngr.FetchAll(ctx)
@@ -137,6 +169,9 @@ func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec
case constants.DeploymentKind:
return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
case constants.DaemonSetKind:
+ if r.confKind == ConfKafkaTransformer {
+ return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
+ }
return r.reconcileAsDaemonSet(ctx, desiredFLP, &builder, configDigest)
default:
return fmt.Errorf("could not reconcile collector, invalid kind: %s", desiredFLP.Kind)
@@ -152,19 +187,13 @@ func (r *singleFLPReconciler) reconcileAsDeployment(ctx context.Context, desired
if err := r.CreateOwned(ctx, builder.deployment(configDigest)); err != nil {
return err
}
- } else if deploymentNeedsUpdate(r.owned.deployment, desiredFLP, configDigest) {
+ } else if deploymentNeedsUpdate(r.owned.deployment, desiredFLP, configDigest, constants.FLPName+FlpConfSuffix[r.confKind]) {
if err := r.UpdateOwned(ctx, r.owned.deployment, builder.deployment(configDigest)); err != nil {
return err
}
}
- if !r.nobjMngr.Exists(r.owned.service) {
- newSVC := builder.service(nil)
- if err := r.CreateOwned(ctx, newSVC); err != nil {
- return err
- }
- } else if serviceNeedsUpdate(r.owned.service, desiredFLP) {
- newSVC := builder.service(r.owned.service)
- if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
+ if r.confKind != ConfKafkaTransformer {
+ if err := r.reconcileAsService(ctx, desiredFLP, builder); err != nil {
return err
}
}
@@ -187,6 +216,21 @@ func (r *singleFLPReconciler) reconcileAsDeployment(ctx context.Context, desired
return nil
}
+func (r *singleFLPReconciler) reconcileAsService(ctx context.Context, desiredFLP *flpSpec, builder *builder) error {
+ if !r.nobjMngr.Exists(r.owned.service) {
+ newSVC := builder.service(nil)
+ if err := r.CreateOwned(ctx, newSVC); err != nil {
+ return err
+ }
+ } else if serviceNeedsUpdate(r.owned.service, desiredFLP) {
+ newSVC := builder.service(r.owned.service)
+ if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (r *singleFLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
// Kind may have changed: try delete Deployment / Service / HPA and create DaemonSet
r.nobjMngr.TryDelete(ctx, r.owned.deployment)
@@ -194,7 +238,7 @@ func (r *singleFLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredF
r.nobjMngr.TryDelete(ctx, r.owned.hpa)
if !r.nobjMngr.Exists(r.owned.daemonSet) {
return r.CreateOwned(ctx, builder.daemonSet(configDigest))
- } else if daemonSetNeedsUpdate(r.owned.daemonSet, desiredFLP, configDigest) {
+ } else if daemonSetNeedsUpdate(r.owned.daemonSet, desiredFLP, configDigest, constants.FLPName+FlpConfSuffix[r.confKind]) {
return r.UpdateOwned(ctx, r.owned.daemonSet, builder.daemonSet(configDigest))
}
return nil
@@ -224,13 +268,13 @@ func (r *FLPReconciler) createPermissions(ctx context.Context, firstInstall bool
return nil
}
-func daemonSetNeedsUpdate(ds *appsv1.DaemonSet, desired *flpSpec, configDigest string) bool {
- return containerNeedsUpdate(&ds.Spec.Template.Spec, desired, true) ||
+func daemonSetNeedsUpdate(ds *appsv1.DaemonSet, desired *flpSpec, configDigest string, name string) bool {
+ return containerNeedsUpdate(&ds.Spec.Template.Spec, desired, true, name) ||
configChanged(&ds.Spec.Template, configDigest)
}
-func deploymentNeedsUpdate(depl *appsv1.Deployment, desired *flpSpec, configDigest string) bool {
- return containerNeedsUpdate(&depl.Spec.Template.Spec, desired, false) ||
+func deploymentNeedsUpdate(depl *appsv1.Deployment, desired *flpSpec, configDigest string, name string) bool {
+ return containerNeedsUpdate(&depl.Spec.Template.Spec, desired, false, name) ||
configChanged(&depl.Spec.Template, configDigest) ||
(desired.HPA == nil && *depl.Spec.Replicas != desired.Replicas)
}
@@ -248,10 +292,10 @@ func serviceNeedsUpdate(svc *corev1.Service, desired *flpSpec) bool {
return true
}
-func containerNeedsUpdate(podSpec *corev1.PodSpec, desired *flpSpec, expectHostPort bool) bool {
+func containerNeedsUpdate(podSpec *corev1.PodSpec, desired *flpSpec, expectHostPort bool, name string) bool {
// Note, we don't check for changed port / host port here, because that would change also the configmap,
// which also triggers pod update anyway
- container := reconcilers.FindContainer(podSpec, constants.FLPName)
+ container := reconcilers.FindContainer(podSpec, name)
return container == nil ||
desired.Image != container.Image ||
desired.ImagePullPolicy != string(container.ImagePullPolicy) ||
diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go
index 6182bb88b..1ec8c4b0b 100644
--- a/controllers/flowlogspipeline/flp_test.go
+++ b/controllers/flowlogspipeline/flp_test.go
@@ -148,7 +148,7 @@ func TestDaemonSetNoChange(t *testing.T) {
b = newBuilder(ns, &flp, &loki, ConfSingle)
_, digest = b.configMap()
- assert.False(daemonSetNeedsUpdate(first, &flp, digest))
+ assert.False(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
}
func TestDaemonSetChanged(t *testing.T) {
@@ -168,7 +168,7 @@ func TestDaemonSetChanged(t *testing.T) {
_, digest = b.configMap()
second := b.daemonSet(digest)
- assert.True(daemonSetNeedsUpdate(first, &flp, digest))
+ assert.True(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check log level change
flp.LogLevel = "info"
@@ -176,7 +176,7 @@ func TestDaemonSetChanged(t *testing.T) {
_, digest = b.configMap()
third := b.daemonSet(digest)
- assert.True(daemonSetNeedsUpdate(second, &flp, digest))
+ assert.True(daemonSetNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check resource change
flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{
@@ -187,7 +187,7 @@ func TestDaemonSetChanged(t *testing.T) {
_, digest = b.configMap()
fourth := b.daemonSet(digest)
- assert.True(daemonSetNeedsUpdate(third, &flp, digest))
+ assert.True(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check reverting limits
flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{
@@ -197,8 +197,8 @@ func TestDaemonSetChanged(t *testing.T) {
b = newBuilder(ns, &flp, &loki, ConfSingle)
_, digest = b.configMap()
- assert.True(daemonSetNeedsUpdate(fourth, &flp, digest))
- assert.False(daemonSetNeedsUpdate(third, &flp, digest))
+ assert.True(daemonSetNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
+ assert.False(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
}
func TestDeploymentNoChange(t *testing.T) {
@@ -218,7 +218,7 @@ func TestDeploymentNoChange(t *testing.T) {
b = newBuilder(ns, &flp, &loki, ConfSingle)
_, digest = b.configMap()
- assert.False(deploymentNeedsUpdate(first, &flp, digest))
+ assert.False(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
}
func TestDeploymentChanged(t *testing.T) {
@@ -238,7 +238,7 @@ func TestDeploymentChanged(t *testing.T) {
_, digest = b.configMap()
second := b.deployment(digest)
- assert.True(deploymentNeedsUpdate(first, &flp, digest))
+ assert.True(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check log level change
flp.LogLevel = "info"
@@ -246,7 +246,7 @@ func TestDeploymentChanged(t *testing.T) {
_, digest = b.configMap()
third := b.deployment(digest)
- assert.True(deploymentNeedsUpdate(second, &flp, digest))
+ assert.True(deploymentNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check resource change
flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{
@@ -257,7 +257,7 @@ func TestDeploymentChanged(t *testing.T) {
_, digest = b.configMap()
fourth := b.deployment(digest)
- assert.True(deploymentNeedsUpdate(third, &flp, digest))
+ assert.True(deploymentNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check reverting limits
flp.Resources.Limits = map[corev1.ResourceName]resource.Quantity{
@@ -268,8 +268,8 @@ func TestDeploymentChanged(t *testing.T) {
_, digest = b.configMap()
fifth := b.deployment(digest)
- assert.True(deploymentNeedsUpdate(fourth, &flp, digest))
- assert.False(deploymentNeedsUpdate(third, &flp, digest))
+ assert.True(deploymentNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
+ assert.False(deploymentNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
// Check replicas didn't change because HPA is used
flp2 := flp
@@ -277,7 +277,7 @@ func TestDeploymentChanged(t *testing.T) {
b = newBuilder(ns, &flp2, &loki, ConfSingle)
_, digest = b.configMap()
- assert.False(deploymentNeedsUpdate(fifth, &flp2, digest))
+ assert.False(deploymentNeedsUpdate(fifth, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
}
func TestDeploymentChangedReplicasNoHPA(t *testing.T) {
@@ -297,7 +297,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) {
b = newBuilder(ns, &flp2, &loki, ConfSingle)
_, digest = b.configMap()
- assert.True(deploymentNeedsUpdate(first, &flp2, digest))
+ assert.True(deploymentNeedsUpdate(first, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle]))
}
func TestServiceNoChange(t *testing.T) {
@@ -434,3 +434,33 @@ func TestLabels(t *testing.T) {
assert.Equal("dev", svc.Labels["version"])
assert.Empty(svc.Spec.Selector["version"])
}
+
+func TestDeployNeeded(t *testing.T) {
+ assert := assert.New(t)
+
+ flp := getFLPConfig()
+
+ // Kafka not configured
+ res, err := checkDeployNeeded(&flp, ConfSingle)
+ assert.True(res)
+ assert.NoError(err)
+ res, err = checkDeployNeeded(&flp, ConfKafkaIngestor)
+ assert.False(res)
+ assert.NoError(err)
+ res, err = checkDeployNeeded(&flp, ConfKafkaTransformer)
+ assert.False(res)
+ assert.NoError(err)
+
+ // Kafka not configured
+ flp.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
+ res, err = checkDeployNeeded(&flp, ConfSingle)
+ assert.False(res)
+ assert.NoError(err)
+ res, err = checkDeployNeeded(&flp, ConfKafkaIngestor)
+ assert.True(res)
+ assert.NoError(err)
+ res, err = checkDeployNeeded(&flp, ConfKafkaTransformer)
+ assert.True(res)
+ assert.NoError(err)
+
+}
diff --git a/controllers/ovs/flowsconfig_reconciler.go b/controllers/ovs/flowsconfig_reconciler.go
index e36599632..2aac77318 100644
--- a/controllers/ovs/flowsconfig_reconciler.go
+++ b/controllers/ovs/flowsconfig_reconciler.go
@@ -40,14 +40,14 @@ func NewFlowsConfigController(client reconcilers.ClientHelper,
// Reconcile reconciles the status of the ovs-flows-config configmap with
// the target FlowCollector ipfix section map
func (c *FlowsConfigController) Reconcile(
- ctx context.Context, target *flowsv1alpha1.FlowCollector) error {
+ ctx context.Context, target *flowsv1alpha1.FlowCollector, flpServiceName string) error {
rlog := log.FromContext(ctx, "component", "FlowsConfigController")
current, err := c.current(ctx)
if err != nil {
return err
}
- desired, err := c.desired(ctx, target)
+ desired, err := c.desired(ctx, target, flpServiceName)
// compare current and desired
if err != nil {
return err
@@ -95,7 +95,7 @@ func (c *FlowsConfigController) current(ctx context.Context) (*flowsConfig, erro
}
func (c *FlowsConfigController) desired(
- ctx context.Context, coll *flowsv1alpha1.FlowCollector) (*flowsConfig, error) {
+ ctx context.Context, coll *flowsv1alpha1.FlowCollector, flpServiceName string) (*flowsConfig, error) {
conf := flowsConfig{FlowCollectorIPFIX: coll.Spec.IPFIX}
@@ -110,7 +110,7 @@ func (c *FlowsConfigController) desired(
svc := corev1.Service{}
if err := c.client.Get(ctx, types.NamespacedName{
Namespace: c.collectorNamespace,
- Name: constants.FLPName,
+ Name: flpServiceName,
}, &svc); err != nil {
return nil, fmt.Errorf("can't get service %s in %s: %w", constants.FLPName, c.collectorNamespace, err)
}
diff --git a/controllers/reconcilers/namespaced_objects_manager.go b/controllers/reconcilers/namespaced_objects_manager.go
index 55ac98f80..f3ad8cac9 100644
--- a/controllers/reconcilers/namespaced_objects_manager.go
+++ b/controllers/reconcilers/namespaced_objects_manager.go
@@ -75,7 +75,7 @@ func (m *NamespacedObjectManager) CleanupNamespace(ctx context.Context) {
ref.SetNamespace(namespace)
log.Info("Deleting old "+obj.kind, "Namespace", namespace, "Name", obj.name)
err := m.client.Delete(ctx, ref)
- if err != nil {
+ if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to delete old "+obj.kind, "Namespace", namespace, "Name", obj.name)
}
}
diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md
index 026bca114..63fd430a4 100644
--- a/docs/FlowCollector.md
+++ b/docs/FlowCollector.md
@@ -1244,6 +1244,13 @@ FlowlogsPipeline contains settings related to the flowlogs-pipeline component
Default: IfNotPresent
Name | +Type | +Description | +Required | +
---|---|---|---|
address | +string | +
+ Address of the kafka server + + Default: + |
+ true | +
topic | +string | +
+ Address of the kafka topic to use + + Default: + |
+ true | +