diff --git a/README.md b/README.md
index 23f1a9713..5bfa5e774 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-
+
diff --git a/charts/logging-operator/templates/clusterrole.yaml b/charts/logging-operator/templates/clusterrole.yaml
index 0eeeb7262..bdc7e9301 100644
--- a/charts/logging-operator/templates/clusterrole.yaml
+++ b/charts/logging-operator/templates/clusterrole.yaml
@@ -290,6 +290,12 @@ rules:
- get
- patch
- update
+- apiGroups:
+ - logging.banzaicloud.io
+ resources:
+ - loggings/finalizers
+ verbs:
+ - update
- apiGroups:
- logging.banzaicloud.io
resources:
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index aaf9e98a2..524dedd99 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -290,6 +290,12 @@ rules:
- get
- patch
- update
+- apiGroups:
+ - logging.banzaicloud.io
+ resources:
+ - loggings/finalizers
+ verbs:
+ - update
- apiGroups:
- logging.banzaicloud.io
resources:
diff --git a/config/samples/fluentdconfig.yaml b/config/samples/fluentdconfig.yaml
new file mode 100644
index 000000000..34a2bb50f
--- /dev/null
+++ b/config/samples/fluentdconfig.yaml
@@ -0,0 +1,18 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: logging
+---
+apiVersion: logging.banzaicloud.io/v1beta1
+kind: Logging
+metadata:
+ name: fluentd-config
+spec:
+ controlNamespace: logging
+---
+apiVersion: logging.banzaicloud.io/v1beta1
+kind: FluentdConfig
+metadata:
+ name: fluentd-config
+ namespace: logging
+spec: {}
diff --git a/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml b/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml
index 80960770a..d56fce83c 100644
--- a/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml
+++ b/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml
@@ -51,10 +51,6 @@ spec:
loggingRef: infra
inputTail:
storage.type: filesystem
- forwardOptions:
- Workers: 0
- syslogng_output:
- Workers: 0
positiondb:
hostPath:
path: ""
@@ -63,6 +59,9 @@ spec:
path: ""
network:
connectTimeout: 2
+ metrics: {}
+ image:
+ tag: 2.1.8-debug
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: LoggingRoute
diff --git a/controllers/logging/logging_controller.go b/controllers/logging/logging_controller.go
index 2e09eb80d..7ded214cb 100644
--- a/controllers/logging/logging_controller.go
+++ b/controllers/logging/logging_controller.go
@@ -31,8 +31,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -52,23 +54,26 @@ import (
)
// NewLoggingReconciler returns a new LoggingReconciler instance
-func NewLoggingReconciler(client client.Client, log logr.Logger) *LoggingReconciler {
+func NewLoggingReconciler(client client.Client, eventRecorder record.EventRecorder, log logr.Logger) *LoggingReconciler {
return &LoggingReconciler{
- Client: client,
- Log: log,
+ Client: client,
+ EventRecorder: eventRecorder,
+ Log: log,
}
}
// LoggingReconciler reconciles a Logging object
type LoggingReconciler struct {
client.Client
- Log logr.Logger
+ EventRecorder record.EventRecorder
+ Log logr.Logger
}
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings;fluentbitagents;flows;clusterflows;outputs;clusteroutputs;nodeagents;fluentdconfigs;syslogngconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/status;fluentbitagents/status;flows/status;clusterflows/status;outputs/status;clusteroutputs/status;nodeagents/status;fluentdconfigs/status;syslogngconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows;syslogngclusterflows;syslogngoutputs;syslogngclusteroutputs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows/status;syslogngclusterflows/status;syslogngoutputs/status;syslogngclusteroutputs/status,verbs=get;update;patch
+// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
@@ -128,7 +133,8 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, errors.WrapIfWithDetails(err, "failed to get logging resources", "logging", logging)
}
- r.dynamicDefaults(ctx, log, loggingResources.GetSyslogNGSpec())
+ _, syslogNGSPec := loggingResources.GetSyslogNGSpec()
+ r.dynamicDefaults(ctx, log, syslogNGSPec)
// metrics
defer func() {
@@ -177,7 +183,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
var loggingDataProvider loggingdataprovider.LoggingDataProvider
- fluentdSpec := loggingResources.GetFluentdSpec()
+ fluentdExternal, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
fluentdConfig, secretList, err := r.clusterConfigurationFluentd(loggingResources)
if err != nil {
@@ -188,12 +194,12 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", fluentdConfig)
- reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
+ reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, fluentdExternal, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
}
- loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)
+ loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)
}
- syslogNGSpec := loggingResources.GetSyslogNGSpec()
+ syslogNGExternal, syslogNGSpec := loggingResources.GetSyslogNGSpec()
if syslogNGSpec != nil {
syslogNGConfig, secretList, err := r.clusterConfigurationSyslogNG(loggingResources)
if err != nil {
@@ -204,9 +210,9 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", syslogNGConfig)
- reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
+ reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGExternal, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
}
- loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging)
+ loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging, syslogNGExternal)
}
switch len(loggingResources.Fluentbits) {
@@ -261,7 +267,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Error(errors.New("nodeagent definition conflict"), problem)
}
}
- reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)).Reconcile)
+ reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)).Reconcile)
}
for _, rec := range reconcilers {
@@ -275,9 +281,73 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}
+ if shouldReturn, err := r.fluentdConfigFinalizer(ctx, &logging, fluentdExternal); shouldReturn || err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if shouldReturn, err := r.syslogNGConfigFinalizer(ctx, &logging, syslogNGExternal); shouldReturn || err != nil {
+ return ctrl.Result{}, err
+ }
+
return ctrl.Result{}, nil
}
+func (r *LoggingReconciler) fluentdConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalFluentd *loggingv1beta1.FluentdConfig) (bool, error) {
+ fluentdConfigFinalizer := "fluentdconfig.logging.banzaicloud.io/finalizer"
+
+ if logging.DeletionTimestamp.IsZero() {
+ if externalFluentd != nil && !controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) {
+ r.Log.Info("adding fluentdconfig finalizer")
+ controllerutil.AddFinalizer(logging, fluentdConfigFinalizer)
+ if err := r.Update(ctx, logging); err != nil {
+ return true, err
+ }
+ }
+ } else if externalFluentd != nil {
+ msg := fmt.Sprintf("refused to delete logging resource while fluentdConfig %s exists", client.ObjectKeyFromObject(externalFluentd))
+ r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
+ return false, errors.New(msg)
+ }
+
+ if controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) && externalFluentd == nil {
+ r.Log.Info("removing fluentdconfig finalizer")
+ controllerutil.RemoveFinalizer(logging, fluentdConfigFinalizer)
+ if err := r.Update(ctx, logging); err != nil {
+ return true, err
+ }
+ }
+
+ return false, nil
+}
+
+func (r *LoggingReconciler) syslogNGConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalSyslogNG *loggingv1beta1.SyslogNGConfig) (bool, error) {
+ syslogNGConfigFinalizer := "syslogngconfig.logging.banzaicloud.io/finalizer"
+
+ if logging.DeletionTimestamp.IsZero() {
+ if externalSyslogNG != nil && !controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) {
+ r.Log.Info("adding syslogngconfig finalizer")
+ controllerutil.AddFinalizer(logging, syslogNGConfigFinalizer)
+ if err := r.Update(ctx, logging); err != nil {
+ return true, err
+ }
+ }
+ } else if externalSyslogNG != nil {
+ msg := fmt.Sprintf("refused to delete logging resource while syslogNGConfig %s exists", client.ObjectKeyFromObject(externalSyslogNG))
+ r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
+ return false, errors.New(msg)
+ }
+
+ if controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) && externalSyslogNG == nil {
+ r.Log.Info("removing syslogngconfig finalizer")
+ controllerutil.RemoveFinalizer(logging, syslogNGConfigFinalizer)
+ if err := r.Update(ctx, logging); err != nil {
+ return true, err
+ }
+ }
+
+ return false, nil
+}
+
func (r *LoggingReconciler) dynamicDefaults(ctx context.Context, log logr.Logger, syslogNGSpec *v1beta1.SyslogNGSpec) {
nodes := corev1.NodeList{}
if err := r.Client.List(ctx, &nodes); err != nil {
@@ -372,6 +442,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Path: syslogng.OutputSecretPath,
}
+ _, syslogngSpec := resources.GetSyslogNGSpec()
in := syslogngconfig.Input{
Name: resources.Logging.Name,
Namespace: resources.Logging.Namespace,
@@ -381,7 +452,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Flows: resources.SyslogNG.Flows,
SecretLoaderFactory: &slf,
SourcePort: syslogng.ServicePort,
- SyslogNGSpec: resources.GetSyslogNGSpec(),
+ SyslogNGSpec: syslogngSpec,
}
var b strings.Builder
if err := syslogngconfig.RenderConfigInto(in, &b); err != nil {
diff --git a/controllers/logging/logging_controller_test.go b/controllers/logging/logging_controller_test.go
index 6ae2daa28..2601bb524 100644
--- a/controllers/logging/logging_controller_test.go
+++ b/controllers/logging/logging_controller_test.go
@@ -41,12 +41,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
+
controllers "github.com/kube-logging/logging-operator/controllers/logging"
"github.com/kube-logging/logging-operator/pkg/resources/fluentd"
"github.com/kube-logging/logging-operator/pkg/resources/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output"
- metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)
var (
@@ -1410,7 +1411,7 @@ func beforeEachWithError(t *testing.T, errors chan<- error) func() {
})
g.Expect(err).NotTo(gomega.HaveOccurred())
- flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("controllers").WithName("Flow"))
+ flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("controllers").WithName("Flow"))
var stopped bool
wrappedReconciler := duplicateRequest(t, flowReconciler, &stopped, errors)
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 000000000..f7a05564a
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,7 @@
+### Contents
+
+This folder contains two major class of documents:
+- technical documentation snippets of various features around the operator
+- generated documentation from code under the [configuration](./configuration) folder
+
+End user documentation is available under https://kube-logging.dev
diff --git a/docs/Readme.md b/docs/Readme.md
deleted file mode 100644
index a53deb678..000000000
--- a/docs/Readme.md
+++ /dev/null
@@ -1 +0,0 @@
-The documentation of the Logging operator project is available at the [Banzai Cloud Documentation Page](https://banzaicloud.com/docs/one-eye/logging-operator/).
\ No newline at end of file
diff --git a/docs/examples/logging_logging_default_route.yaml b/docs/examples/logging_logging_default_route.yaml
deleted file mode 100644
index 5785149f6..000000000
--- a/docs/examples/logging_logging_default_route.yaml
+++ /dev/null
@@ -1,11 +0,0 @@
-apiVersion: logging.banzaicloud.io/v1beta1
-kind: Logging
-metadata:
- name: default-logging-simple
-spec:
- fluentd: {}
- fluentbit: {}
- controlNamespace: default
- defaultFlow:
- outputRefs:
- - null-output-sample
diff --git a/docs/fluentbit-flow-control.md b/docs/fluentbit-flow-control.md
new file mode 100644
index 000000000..25f14a47f
--- /dev/null
+++ b/docs/fluentbit-flow-control.md
@@ -0,0 +1,55 @@
+## Flow control with durability in a multi tenant setup
+
+Resources:
+- https://docs.fluentbit.io/manual/administration/backpressure
+- https://docs.fluentbit.io/manual/administration/buffering-and-storage
+- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging
+- https://docs.fluentbit.io/manual/administration/monitoring
+- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal
+
+### Context
+
+Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags).
+
+### Durability
+
+According to the referenced resources we need `storage.type filesystem` for *every input*
+where we want to avoid losing data. If we just enable this option, there will be no limit
+on how many data fluent-bit should keep on disk.
+
+> Note: we also have to configure the position db to avoid fluent-bit
+> reading the same files from the beginning after a restart
+
+### Memory limit
+
+The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit.
+But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many
+chunks it can read up and handle in memory at the same time.
+Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only
+affect the overall throughput.
+
+### Disk usage limit
+
+In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for
+every *output* individually. This sounds good, but the problem with this option is that it doesn't
+cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss,
+so this option should be used with care.
+
+### Backpressure
+
+Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important
+caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit.
+
+Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up
+on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that
+tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our
+observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs
+to the faulty output is paused and when the output gets back online, the input is resumed immediately.
+
+Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent
+are not kept in memory, so other input/output pairs are not limited by the throughput.
+
+In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded.
+
+As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused,
+data in log files that gets deleted by the container runtime during the output's downtime will get lost.
diff --git a/docs/img/lo.svg b/docs/img/lo.svg
deleted file mode 100644
index 7e6077a26..000000000
--- a/docs/img/lo.svg
+++ /dev/null
@@ -1,69 +0,0 @@
-
diff --git a/main.go b/main.go
index 5e13b9578..cb0e7f34d 100644
--- a/main.go
+++ b/main.go
@@ -165,7 +165,7 @@ func main() {
os.Exit(1)
}
- loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("logging"))
+ loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("logging"))
if err := (&extensionsControllers.EventTailerReconciler{
Client: mgr.GetClient(),
@@ -216,6 +216,7 @@ func main() {
// +kubebuilder:scaffold:builder
setupLog.Info("starting manager")
+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
diff --git a/pkg/resources/fluentbit/config.go b/pkg/resources/fluentbit/config.go
index 644052a18..398cb9e44 100644
--- a/pkg/resources/fluentbit/config.go
+++ b/pkg/resources/fluentbit/config.go
@@ -55,21 +55,14 @@ var fluentBitConfigTemplate = `
{{- end }}
{{- end }}
-[INPUT]
- Name tail
- {{- range $key, $value := .Input.Values }}
- {{- if $value }}
- {{ $key }} {{$value}}
- {{- end }}
- {{- end }}
- {{- range $id, $v := .Input.ParserN }}
- {{- if $v }}
- Parse_{{ $id}} {{$v}}
- {{- end }}
- {{- end }}
- {{- if .Input.MultilineParser }}
- multiline.parser {{- range $i, $v := .Input.MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
- {{- end }}
+{{- if .Inputs }}
+{{- range $input := .Inputs }}
+# Tenant: {{ $input.Tenant }}
+{{- template "input" $input }}
+{{- end }}
+{{- else }}
+{{- template "input" .Input }}
+{{- end }}
{{- if not .DisableKubernetesFilter }}
[FILTER]
@@ -111,11 +104,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name forward
- {{- if $target.AllNamespaces }}
- Match *
- {{- else }}
- Match_Regex {{ $target.NamespaceRegex }}
- {{- end }}
+ Match {{ $target.Match }}
{{- if $out.Upstream.Enabled }}
Upstream {{ $out.Upstream.Config.Path }}
{{- else }}
@@ -149,11 +138,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name tcp
- {{- if $target.AllNamespaces }}
- Match *
- {{- else }}
- Match_Regex {{ $target.NamespaceRegex }}
- {{- end }}
+ Match {{ $target.Match }}
Host {{ $target.Host }}
Port {{ $target.Port }}
Format json_lines
@@ -203,6 +188,26 @@ var fluentbitNetworkTemplate = `
{{- end }}
`
+var fluentbitInputTemplate = `
+{{- define "input" }}
+[INPUT]
+ Name tail
+ {{- range $key, $value := .Values }}
+ {{- if $value }}
+ {{ $key }} {{$value}}
+ {{- end }}
+ {{- end }}
+ {{- range $id, $v := .ParserN }}
+ {{- if $v }}
+ Parse_{{ $id}} {{$v}}
+ {{- end }}
+ {{- end }}
+ {{- if .MultilineParser }}
+ multiline.parser {{- range $i, $v := .MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
+ {{- end }}
+{{- end }}
+`
+
var upstreamConfigTemplate = `
[UPSTREAM]
Name {{ .Config.Name }}
diff --git a/pkg/resources/fluentbit/configsecret.go b/pkg/resources/fluentbit/configsecret.go
index b2cb23c4b..50f98a338 100644
--- a/pkg/resources/fluentbit/configsecret.go
+++ b/pkg/resources/fluentbit/configsecret.go
@@ -38,6 +38,13 @@ type fluentbitInputConfig struct {
MultilineParser []string
}
+type fluentbitInputConfigWithTenant struct {
+ Tenant string
+ Values map[string]string
+ ParserN []string
+ MultilineParser []string
+}
+
type upstreamNode struct {
Name string
Host string
@@ -63,6 +70,7 @@ type fluentBitConfig struct {
CoroStackSize int32
Output map[string]string
Input fluentbitInputConfig
+ Inputs []fluentbitInputConfigWithTenant
DisableKubernetesFilter bool
KubernetesFilter map[string]string
AwsFilter map[string]string
@@ -86,8 +94,8 @@ type fluentForwardOutputConfig struct {
}
type forwardTargetConfig struct {
- AllNamespaces bool
NamespaceRegex string
+ Match string
Host string
Port int32
}
@@ -243,7 +251,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
return nil, nil, errs
}
- fluentdSpec := loggingResources.GetFluentdSpec()
+ _, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
fluentbitTargetHost := r.fluentbitSpec.TargetHost
@@ -361,7 +369,8 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
}
}
- if loggingResources.GetSyslogNGSpec() != nil {
+ _, syslogNGSPec := loggingResources.GetSyslogNGSpec()
+ if syslogNGSPec != nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
input.SyslogNGOutput.Host = aggregatorEndpoint(r.Logging, syslogng.ServiceName)
input.SyslogNGOutput.Port = syslogng.ServicePort
@@ -372,6 +381,9 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
for _, a := range loggingResources.LoggingRoutes {
tenants = append(tenants, a.Status.Tenants...)
}
+ if err := r.configureInputsForTenants(tenants, &input); err != nil {
+ return nil, nil, errors.WrapIf(err, "configuring inputs for target tenants")
+ }
if err := r.configureOutputsForTenants(ctx, tenants, &input); err != nil {
return nil, nil, errors.WrapIf(err, "configuring outputs for target tenants")
}
@@ -379,15 +391,15 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
// compatibility with existing configuration
if input.FluentForwardOutput != nil {
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
- AllNamespaces: true,
- Host: input.FluentForwardOutput.TargetHost,
- Port: input.FluentForwardOutput.TargetPort,
+ Match: "*",
+ Host: input.FluentForwardOutput.TargetHost,
+ Port: input.FluentForwardOutput.TargetPort,
})
} else if input.SyslogNGOutput != nil {
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
- AllNamespaces: true,
- Host: input.SyslogNGOutput.Host,
- Port: input.SyslogNGOutput.Port,
+ Match: "*",
+ Host: input.SyslogNGOutput.Host,
+ Port: input.SyslogNGOutput.Port,
})
}
}
@@ -454,6 +466,10 @@ func generateConfig(input fluentBitConfig) (string, error) {
if err != nil {
return "", errors.WrapIf(err, "parsing fluentbit network nested template")
}
+ tmpl, err = tmpl.Parse(fluentbitInputTemplate)
+ if err != nil {
+ return "", errors.WrapIf(err, "parsing fluentbit input nested template")
+ }
err = tmpl.Execute(output, input)
if err != nil {
return "", errors.WrapIf(err, "executing fluentbit config template")
diff --git a/pkg/resources/fluentbit/tenants.go b/pkg/resources/fluentbit/tenants.go
index 79b95d0e5..55139d146 100644
--- a/pkg/resources/fluentbit/tenants.go
+++ b/pkg/resources/fluentbit/tenants.go
@@ -16,11 +16,14 @@ package fluentbit
import (
"context"
+ "crypto/sha256"
+ "encoding/hex"
"fmt"
"sort"
"strings"
"emperror.dev/errors"
+ "golang.org/x/exp/maps"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -80,11 +83,7 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client
func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v1beta1.Tenant, input *fluentBitConfig) error {
var errs error
for _, t := range tenants {
- allNamespaces := len(t.Namespaces) == 0
- namespaceRegex := `.`
- if !allNamespaces {
- namespaceRegex = fmt.Sprintf("^[^_]+_(%s)_", strings.Join(t.Namespaces, "|"))
- }
+ match := fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
logging := &v1beta1.Logging{}
if err := r.resourceReconciler.Client.Get(ctx, types.NamespacedName{Name: t.Name}, logging); err != nil {
return errors.WrapIf(err, "getting logging resource")
@@ -96,25 +95,24 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
continue
}
- if loggingResources.GetFluentdSpec() != nil {
+ _, fluentdSpec := loggingResources.GetFluentd()
+ if fluentdSpec != nil {
if input.FluentForwardOutput == nil {
input.FluentForwardOutput = &fluentForwardOutputConfig{}
}
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
- AllNamespaces: allNamespaces,
- NamespaceRegex: namespaceRegex,
- Host: aggregatorEndpoint(logging, fluentd.ServiceName),
- Port: fluentd.ServicePort,
+ Match: match,
+ Host: aggregatorEndpoint(logging, fluentd.ServiceName),
+ Port: fluentd.ServicePort,
})
- } else if loggingResources.GetSyslogNGSpec() != nil {
+ } else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil {
if input.SyslogNGOutput == nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
}
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
- AllNamespaces: allNamespaces,
- NamespaceRegex: namespaceRegex,
- Host: aggregatorEndpoint(logging, syslogng.ServiceName),
- Port: syslogng.ServicePort,
+ Match: match,
+ Host: aggregatorEndpoint(logging, syslogng.ServiceName),
+ Port: syslogng.ServicePort,
})
} else {
errs = errors.Append(errs, errors.Errorf("logging %s does not provide any aggregator configured", t.Name))
@@ -122,3 +120,45 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
}
return errs
}
+
+func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *fluentBitConfig) error {
+ var errs error
+ for _, t := range tenants {
+ allNamespaces := len(t.Namespaces) == 0
+ tenantValues := maps.Clone(input.Input.Values)
+ if !allNamespaces {
+ var paths []string
+ for _, n := range t.Namespaces {
+ paths = append(paths, fmt.Sprintf("/var/log/containers/*_%s_*.log", n))
+ }
+ tenantValues["Path"] = strings.Join(paths, ",")
+ } else {
+ tenantValues["Path"] = "/var/log/containers/*.log"
+ }
+
+ tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name)
+ tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
+ // This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure
+ tenantValues["storage.pause_on_chunks_overlimit"] = "on"
+ input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{
+ Tenant: t.Name,
+ Values: tenantValues,
+ ParserN: input.Input.ParserN,
+ MultilineParser: input.Input.MultilineParser,
+ })
+ }
+ // the regex will work only if we cut the prefix off. fluentbit doesn't care about the content, just the length
+ input.KubernetesFilter["Kube_Tag_Prefix"] = `kubernetes.0000000000.var.log.containers.`
+ return errs
+}
+
+func hashFromTenantName(input string) string {
+ hasher := sha256.New()
+ hasher.Write([]byte(input))
+ hashBytes := hasher.Sum(nil)
+
+ // Convert the hash to a hex string
+ hashString := hex.EncodeToString(hashBytes)
+
+ return hashString[0:10]
+}
diff --git a/pkg/resources/fluentd/appconfigmap.go b/pkg/resources/fluentd/appconfigmap.go
index 1dac41421..c6189939f 100644
--- a/pkg/resources/fluentd/appconfigmap.go
+++ b/pkg/resources/fluentd/appconfigmap.go
@@ -262,17 +262,11 @@ func (r *Reconciler) newCheckPod(hashKey string, fluentdSpec v1beta1.FluentdSpec
Tolerations: fluentdSpec.Tolerations,
Affinity: fluentdSpec.Affinity,
PriorityClassName: fluentdSpec.PodPriorityClassName,
- SecurityContext: &corev1.PodSecurityContext{
- RunAsNonRoot: fluentdSpec.Security.PodSecurityContext.RunAsNonRoot,
- FSGroup: fluentdSpec.Security.PodSecurityContext.FSGroup,
- RunAsUser: fluentdSpec.Security.PodSecurityContext.RunAsUser,
- RunAsGroup: fluentdSpec.Security.PodSecurityContext.RunAsGroup,
- SeccompProfile: fluentdSpec.Security.PodSecurityContext.SeccompProfile,
- },
- Volumes: volumes,
- ImagePullSecrets: fluentdSpec.Image.ImagePullSecrets,
- InitContainers: initContainer,
- Containers: container,
+ SecurityContext: fluentdSpec.Security.PodSecurityContext,
+ Volumes: volumes,
+ ImagePullSecrets: fluentdSpec.Image.ImagePullSecrets,
+ InitContainers: initContainer,
+ Containers: container,
},
}
if fluentdSpec.ConfigCheckAnnotations != nil {
diff --git a/pkg/resources/fluentd/dataprovider.go b/pkg/resources/fluentd/dataprovider.go
index ccba81e86..d816178df 100644
--- a/pkg/resources/fluentd/dataprovider.go
+++ b/pkg/resources/fluentd/dataprovider.go
@@ -25,23 +25,25 @@ import (
)
type DataProvider struct {
- client client.Client
- logging *v1beta1.Logging
- fluentdSpec *v1beta1.FluentdSpec
+ client client.Client
+ logging *v1beta1.Logging
+ fluentdSpec *v1beta1.FluentdSpec
+ fluentdConfig *v1beta1.FluentdConfig
}
-func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec) *DataProvider {
+func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, fluentdConfig *v1beta1.FluentdConfig) *DataProvider {
return &DataProvider{
- client: client,
- logging: logging,
- fluentdSpec: fluentdSpec,
+ client: client,
+ logging: logging,
+ fluentdSpec: fluentdSpec,
+ fluentdConfig: fluentdConfig,
}
}
func (p *DataProvider) GetReplicaCount(ctx context.Context) (*int32, error) {
if p.fluentdSpec != nil {
sts := &v1.StatefulSet{}
- om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec)
+ om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec, p.fluentdConfig)
err := p.client.Get(ctx, types.NamespacedName{Namespace: om.Namespace, Name: om.Name}, sts)
if err != nil {
return nil, errors.WrapIf(client.IgnoreNotFound(err), "getting fluentd statefulset")
diff --git a/pkg/resources/fluentd/drainjob.go b/pkg/resources/fluentd/drainjob.go
index 7d7a29c7b..3b056f7e5 100644
--- a/pkg/resources/fluentd/drainjob.go
+++ b/pkg/resources/fluentd/drainjob.go
@@ -65,14 +65,8 @@ func (r *Reconciler) drainerJobFor(pvc corev1.PersistentVolumeClaim, fluentdSpec
Affinity: fluentdSpec.Affinity,
TopologySpreadConstraints: fluentdSpec.TopologySpreadConstraints,
PriorityClassName: fluentdSpec.PodPriorityClassName,
- SecurityContext: &corev1.PodSecurityContext{
- RunAsNonRoot: fluentdSpec.Security.PodSecurityContext.RunAsNonRoot,
- FSGroup: fluentdSpec.Security.PodSecurityContext.FSGroup,
- RunAsUser: fluentdSpec.Security.PodSecurityContext.RunAsUser,
- RunAsGroup: fluentdSpec.Security.PodSecurityContext.RunAsGroup,
- SeccompProfile: fluentdSpec.Security.PodSecurityContext.SeccompProfile,
- },
- RestartPolicy: corev1.RestartPolicyNever,
+ SecurityContext: fluentdSpec.Security.PodSecurityContext,
+ RestartPolicy: corev1.RestartPolicyNever,
},
},
}
diff --git a/pkg/resources/fluentd/fluentd.go b/pkg/resources/fluentd/fluentd.go
index a7e099537..24bdbc18e 100644
--- a/pkg/resources/fluentd/fluentd.go
+++ b/pkg/resources/fluentd/fluentd.go
@@ -17,7 +17,6 @@ package fluentd
import (
"context"
"fmt"
- "time"
"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/reconciler"
@@ -68,8 +67,9 @@ const (
// Reconciler holds info what resource to reconcile
type Reconciler struct {
- Logging *v1beta1.Logging
- fluentdSpec *v1beta1.FluentdSpec
+ Logging *v1beta1.Logging
+ fluentdSpec *v1beta1.FluentdSpec
+ fluentdConfig *v1beta1.FluentdConfig
*reconciler.GenericResourceReconciler
config *string
secrets *secret.MountSecrets
@@ -112,10 +112,11 @@ func (r *Reconciler) getServiceAccount() string {
}
func New(client client.Client, log logr.Logger,
- logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, config *string, secrets *secret.MountSecrets, opts reconciler.ReconcilerOpts) *Reconciler {
+ logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, fluentdConfig *v1beta1.FluentdConfig, config *string, secrets *secret.MountSecrets, opts reconciler.ReconcilerOpts) *Reconciler {
return &Reconciler{
Logging: logging,
fluentdSpec: fluentdSpec,
+ fluentdConfig: fluentdConfig,
GenericResourceReconciler: reconciler.NewGenericReconciler(client, log, opts),
config: config,
secrets: secrets,
@@ -203,7 +204,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) (*reconcile.Result, error) {
} else {
r.Log.Info("still waiting for the configcheck result...")
}
- return &reconcile.Result{RequeueAfter: time.Minute}, nil
+ return &reconcile.Result{Requeue: true}, nil
}
}
}
@@ -315,7 +316,7 @@ func (r *Reconciler) reconcileDrain(ctx context.Context) (*reconcile.Result, err
}
}
- replicaCount, err := NewDataProvider(r.Client, r.Logging, r.fluentdSpec).GetReplicaCount(ctx)
+ replicaCount, err := NewDataProvider(r.Client, r.Logging, r.fluentdSpec, r.fluentdConfig).GetReplicaCount(ctx)
if err != nil {
return nil, errors.WrapIf(err, "get replica count for fluentd")
}
diff --git a/pkg/resources/fluentd/meta.go b/pkg/resources/fluentd/meta.go
index 9833736ce..b722b4288 100644
--- a/pkg/resources/fluentd/meta.go
+++ b/pkg/resources/fluentd/meta.go
@@ -21,19 +21,29 @@ import (
// FluentdObjectMeta creates an objectMeta for resource fluentd
func (r *Reconciler) FluentdObjectMeta(name, component string) metav1.ObjectMeta {
+ ownerReference := metav1.OwnerReference{
+ APIVersion: r.Logging.APIVersion,
+ Kind: r.Logging.Kind,
+ Name: r.Logging.Name,
+ UID: r.Logging.UID,
+ Controller: util.BoolPointer(true),
+ }
+
+ if r.fluentdConfig != nil {
+ ownerReference = metav1.OwnerReference{
+ APIVersion: r.fluentdConfig.APIVersion,
+ Kind: r.fluentdConfig.Kind,
+ Name: r.fluentdConfig.Name,
+ UID: r.fluentdConfig.UID,
+ Controller: util.BoolPointer(true),
+ }
+ }
+
o := metav1.ObjectMeta{
- Name: r.Logging.QualifiedName(name),
- Namespace: r.Logging.Spec.ControlNamespace,
- Labels: r.Logging.GetFluentdLabels(component, *r.fluentdSpec),
- OwnerReferences: []metav1.OwnerReference{
- {
- APIVersion: r.Logging.APIVersion,
- Kind: r.Logging.Kind,
- Name: r.Logging.Name,
- UID: r.Logging.UID,
- Controller: util.BoolPointer(true),
- },
- },
+ Name: r.Logging.QualifiedName(name),
+ Namespace: r.Logging.Spec.ControlNamespace,
+ Labels: r.Logging.GetFluentdLabels(component, *r.fluentdSpec),
+ OwnerReferences: []metav1.OwnerReference{ownerReference},
}
return *o.DeepCopy()
}
diff --git a/pkg/resources/fluentd/statefulset.go b/pkg/resources/fluentd/statefulset.go
index 85d842d50..2fc0d16f4 100644
--- a/pkg/resources/fluentd/statefulset.go
+++ b/pkg/resources/fluentd/statefulset.go
@@ -125,13 +125,7 @@ func (r *Reconciler) statefulsetSpec() *appsv1.StatefulSetSpec {
PriorityClassName: r.fluentdSpec.PodPriorityClassName,
DNSPolicy: r.fluentdSpec.DNSPolicy,
DNSConfig: r.fluentdSpec.DNSConfig,
- SecurityContext: &corev1.PodSecurityContext{
- RunAsNonRoot: r.fluentdSpec.Security.PodSecurityContext.RunAsNonRoot,
- FSGroup: r.fluentdSpec.Security.PodSecurityContext.FSGroup,
- RunAsUser: r.fluentdSpec.Security.PodSecurityContext.RunAsUser,
- RunAsGroup: r.fluentdSpec.Security.PodSecurityContext.RunAsGroup,
- SeccompProfile: r.fluentdSpec.Security.PodSecurityContext.SeccompProfile,
- },
+ SecurityContext: r.fluentdSpec.Security.PodSecurityContext,
},
},
ServiceName: r.Logging.QualifiedName(ServiceName + "-headless"),
diff --git a/pkg/resources/model/reconciler.go b/pkg/resources/model/reconciler.go
index 4e37d9b74..d4a81c849 100644
--- a/pkg/resources/model/reconciler.go
+++ b/pkg/resources/model/reconciler.go
@@ -238,6 +238,8 @@ func NewValidationReconciler(
resources.Fluentd.Configuration.Status.Active = utils.BoolPointer(true)
resources.Fluentd.Configuration.Status.Logging = resources.Logging.Name
+ } else {
+ resources.Logging.Status.FluentdConfigName = ""
}
if len(resources.SyslogNG.ExcessSyslogNGs) != 0 {
@@ -271,6 +273,8 @@ func NewValidationReconciler(
logger.Info("found detached syslog-ng aggregator, making association, done: ", "name=", resources.Logging.Status.SyslogNGConfigName)
resources.SyslogNG.Configuration.Status.Active = utils.BoolPointer(true)
resources.SyslogNG.Configuration.Status.Logging = resources.Logging.Name
+ } else {
+ resources.Logging.Status.SyslogNGConfigName = ""
}
if !resources.Logging.WatchAllNamespaces() {
diff --git a/pkg/resources/model/resources.go b/pkg/resources/model/resources.go
index afea2a582..6b6898d3b 100644
--- a/pkg/resources/model/resources.go
+++ b/pkg/resources/model/resources.go
@@ -29,23 +29,19 @@ type LoggingResources struct {
WatchNamespaces []string
}
-func (l LoggingResources) getFluentd() *v1beta1.FluentdConfig {
+func (l LoggingResources) getFluentdConfig() *v1beta1.FluentdConfig {
if l.Fluentd.Configuration != nil {
return l.Fluentd.Configuration
}
return nil
}
-func (l LoggingResources) GetFluentdSpec() *v1beta1.FluentdSpec {
-
- if detachedFluentd := l.getFluentd(); detachedFluentd != nil {
- return &detachedFluentd.Spec
- }
- if l.Logging.Spec.FluentdSpec != nil {
- return l.Logging.Spec.FluentdSpec
+func (l LoggingResources) GetFluentd() (*v1beta1.FluentdConfig, *v1beta1.FluentdSpec) {
+ if detachedFluentd := l.getFluentdConfig(); detachedFluentd != nil {
+ return detachedFluentd, &detachedFluentd.Spec
}
- return nil
+ return nil, l.Logging.Spec.FluentdSpec
}
type FluentdLoggingResources struct {
@@ -64,16 +60,13 @@ func (l LoggingResources) getSyslogNG() *v1beta1.SyslogNGConfig {
return nil
}
-func (l LoggingResources) GetSyslogNGSpec() *v1beta1.SyslogNGSpec {
+func (l LoggingResources) GetSyslogNGSpec() (*v1beta1.SyslogNGConfig, *v1beta1.SyslogNGSpec) {
if detachedSyslogNG := l.getSyslogNG(); detachedSyslogNG != nil {
- return &detachedSyslogNG.Spec
- }
- if l.Logging.Spec.SyslogNGSpec != nil {
- return l.Logging.Spec.SyslogNGSpec
+ return detachedSyslogNG, &detachedSyslogNG.Spec
}
+ return nil, l.Logging.Spec.SyslogNGSpec
- return nil
}
type SyslogNGLoggingResources struct {
diff --git a/pkg/resources/model/system.go b/pkg/resources/model/system.go
index 5ac622879..761dd1a7c 100644
--- a/pkg/resources/model/system.go
+++ b/pkg/resources/model/system.go
@@ -32,7 +32,7 @@ import (
func CreateSystem(resources LoggingResources, secrets SecretLoaderFactory, logger logr.Logger) (*types.System, error) {
logging := resources.Logging
- fluentdSpec := resources.GetFluentdSpec()
+ _, fluentdSpec := resources.GetFluentd()
var forwardInput *input.ForwardInputConfig
if fluentdSpec != nil && fluentdSpec.ForwardInputConfig != nil {
diff --git a/pkg/resources/syslogng/dataprovider.go b/pkg/resources/syslogng/dataprovider.go
index 859b13aa9..d5b2ec713 100644
--- a/pkg/resources/syslogng/dataprovider.go
+++ b/pkg/resources/syslogng/dataprovider.go
@@ -26,20 +26,22 @@ import (
)
type DataProvider struct {
- client client.Client
- logging *v1beta1.Logging
+ client client.Client
+ logging *v1beta1.Logging
+ syslogNGSConfig *v1beta1.SyslogNGConfig
}
-func NewDataProvider(client client.Client, logging *v1beta1.Logging) *DataProvider {
+func NewDataProvider(client client.Client, logging *v1beta1.Logging, syslogNGSConfig *v1beta1.SyslogNGConfig) *DataProvider {
return &DataProvider{
- client: client,
- logging: logging,
+ client: client,
+ logging: logging,
+ syslogNGSConfig: syslogNGSConfig,
}
}
func (p *DataProvider) GetReplicaCount(ctx context.Context) (*int32, error) {
sts := &v1.StatefulSet{}
- om := p.logging.SyslogNGObjectMeta(StatefulSetName, ComponentSyslogNG)
+ om := p.logging.SyslogNGObjectMeta(StatefulSetName, ComponentSyslogNG, p.syslogNGSConfig)
err := p.client.Get(ctx, types.NamespacedName{Namespace: om.Namespace, Name: om.Name}, sts)
if err != nil {
return nil, errors.WrapIf(client.IgnoreNotFound(err), "getting syslog-ng statefulset")
diff --git a/pkg/resources/syslogng/meta.go b/pkg/resources/syslogng/meta.go
index fbca79330..a4db1e97a 100644
--- a/pkg/resources/syslogng/meta.go
+++ b/pkg/resources/syslogng/meta.go
@@ -21,19 +21,27 @@ import (
// SyslogNGObjectMeta creates an objectMeta for resource syslog-ng
func (r *Reconciler) SyslogNGObjectMeta(name, component string) metav1.ObjectMeta {
+ ownerReference := metav1.OwnerReference{
+ APIVersion: r.Logging.APIVersion,
+ Kind: r.Logging.Kind,
+ Name: r.Logging.Name,
+ UID: r.Logging.UID,
+ Controller: util.BoolPointer(true),
+ }
+ if r.syslogNGConfig != nil {
+ ownerReference = metav1.OwnerReference{
+ APIVersion: r.syslogNGConfig.APIVersion,
+ Kind: r.syslogNGConfig.Kind,
+ Name: r.syslogNGConfig.Name,
+ UID: r.syslogNGConfig.UID,
+ Controller: util.BoolPointer(true),
+ }
+ }
o := metav1.ObjectMeta{
- Name: r.Logging.QualifiedName(name),
- Namespace: r.Logging.Spec.ControlNamespace,
- Labels: r.Logging.GetSyslogNGLabels(component),
- OwnerReferences: []metav1.OwnerReference{
- {
- APIVersion: r.Logging.APIVersion,
- Kind: r.Logging.Kind,
- Name: r.Logging.Name,
- UID: r.Logging.UID,
- Controller: util.BoolPointer(true),
- },
- },
+ Name: r.Logging.QualifiedName(name),
+ Namespace: r.Logging.Spec.ControlNamespace,
+ Labels: r.Logging.GetSyslogNGLabels(component),
+ OwnerReferences: []metav1.OwnerReference{ownerReference},
}
return *o.DeepCopy()
}
diff --git a/pkg/resources/syslogng/statefulset.go b/pkg/resources/syslogng/statefulset.go
index a6b4b96bb..c4b44c481 100644
--- a/pkg/resources/syslogng/statefulset.go
+++ b/pkg/resources/syslogng/statefulset.go
@@ -43,7 +43,7 @@ func (r *Reconciler) statefulset() (runtime.Object, reconciler.DesiredState, err
}
desired := &appsv1.StatefulSet{
- ObjectMeta: r.Logging.SyslogNGObjectMeta(StatefulSetName, ComponentSyslogNG),
+ ObjectMeta: r.Logging.SyslogNGObjectMeta(StatefulSetName, ComponentSyslogNG, r.syslogNGConfig),
Spec: appsv1.StatefulSetSpec{
PodManagementPolicy: appsv1.OrderedReadyPodManagement,
Selector: &metav1.LabelSelector{
diff --git a/pkg/resources/syslogng/syslogng.go b/pkg/resources/syslogng/syslogng.go
index c01d5d035..526941356 100644
--- a/pkg/resources/syslogng/syslogng.go
+++ b/pkg/resources/syslogng/syslogng.go
@@ -16,7 +16,6 @@ package syslogng
import (
"context"
- "time"
"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/reconciler"
@@ -71,8 +70,9 @@ const (
// Reconciler holds info what resource to reconcile
type Reconciler struct {
- Logging *v1beta1.Logging
- syslogNGSpec *v1beta1.SyslogNGSpec
+ Logging *v1beta1.Logging
+ syslogNGSpec *v1beta1.SyslogNGSpec
+ syslogNGConfig *v1beta1.SyslogNGConfig
*reconciler.GenericResourceReconciler
config string
secrets *secret.MountSecrets
@@ -91,6 +91,7 @@ func New(
log logr.Logger,
logging *v1beta1.Logging,
syslogNGSPec *v1beta1.SyslogNGSpec,
+ syslogNGCOnfig *v1beta1.SyslogNGConfig,
config string,
secrets *secret.MountSecrets,
opts reconciler.ReconcilerOpts,
@@ -98,6 +99,7 @@ func New(
return &Reconciler{
Logging: logging,
syslogNGSpec: syslogNGSPec,
+ syslogNGConfig: syslogNGCOnfig,
GenericResourceReconciler: reconciler.NewGenericReconciler(client, log, opts),
config: config,
secrets: secrets,
@@ -184,7 +186,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) (*reconcile.Result, error) {
} else {
r.Log.Info("still waiting for the configcheck result...")
}
- return &reconcile.Result{RequeueAfter: time.Minute}, nil
+ return &reconcile.Result{Requeue: true}, nil
}
}
}
diff --git a/pkg/sdk/logging/api/v1beta1/logging_types.go b/pkg/sdk/logging/api/v1beta1/logging_types.go
index 08eed053d..d2e2c765b 100644
--- a/pkg/sdk/logging/api/v1beta1/logging_types.go
+++ b/pkg/sdk/logging/api/v1beta1/logging_types.go
@@ -461,20 +461,29 @@ func persistentVolumeModePointer(mode v1.PersistentVolumeMode) *v1.PersistentVol
}
// FluentdObjectMeta creates an objectMeta for resource fluentd
-func (l *Logging) FluentdObjectMeta(name, component string, f FluentdSpec) metav1.ObjectMeta {
+func (l *Logging) FluentdObjectMeta(name, component string, f FluentdSpec, fc *FluentdConfig) metav1.ObjectMeta {
+ ownerReference := metav1.OwnerReference{
+ APIVersion: l.APIVersion,
+ Kind: l.Kind,
+ Name: l.Name,
+ UID: l.UID,
+ Controller: util.BoolPointer(true),
+ }
+
+ if fc != nil {
+ ownerReference = metav1.OwnerReference{
+ APIVersion: fc.APIVersion,
+ Kind: fc.Kind,
+ Name: fc.Name,
+ UID: fc.UID,
+ Controller: util.BoolPointer(true),
+ }
+ }
o := metav1.ObjectMeta{
- Name: l.QualifiedName(name),
- Namespace: l.Spec.ControlNamespace,
- Labels: l.GetFluentdLabels(component, f),
- OwnerReferences: []metav1.OwnerReference{
- {
- APIVersion: l.APIVersion,
- Kind: l.Kind,
- Name: l.Name,
- UID: l.UID,
- Controller: util.BoolPointer(true),
- },
- },
+ Name: l.QualifiedName(name),
+ Namespace: l.Spec.ControlNamespace,
+ Labels: l.GetFluentdLabels(component, f),
+ OwnerReferences: []metav1.OwnerReference{ownerReference},
}
return o
}
@@ -491,20 +500,28 @@ func (l *Logging) GetFluentdLabels(component string, f FluentdSpec) map[string]s
}
// SyslogNGObjectMeta creates an objectMeta for resource syslog-ng
-func (l *Logging) SyslogNGObjectMeta(name, component string) metav1.ObjectMeta {
+func (l *Logging) SyslogNGObjectMeta(name, component string, sc *SyslogNGConfig) metav1.ObjectMeta {
+ ownerReference := metav1.OwnerReference{
+ APIVersion: l.APIVersion,
+ Kind: l.Kind,
+ Name: l.Name,
+ UID: l.UID,
+ Controller: util.BoolPointer(true),
+ }
+ if sc != nil {
+ ownerReference = metav1.OwnerReference{
+ APIVersion: sc.APIVersion,
+ Kind: sc.Kind,
+ Name: sc.Name,
+ UID: sc.UID,
+ Controller: util.BoolPointer(true),
+ }
+ }
o := metav1.ObjectMeta{
- Name: l.QualifiedName(name),
- Namespace: l.Spec.ControlNamespace,
- Labels: l.GetSyslogNGLabels(component),
- OwnerReferences: []metav1.OwnerReference{
- {
- APIVersion: l.APIVersion,
- Kind: l.Kind,
- Name: l.Name,
- UID: l.UID,
- Controller: util.BoolPointer(true),
- },
- },
+ Name: l.QualifiedName(name),
+ Namespace: l.Spec.ControlNamespace,
+ Labels: l.GetSyslogNGLabels(component),
+ OwnerReferences: []metav1.OwnerReference{ownerReference},
}
return o
}