Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge latest updates from release 4.5 back to master #1687

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<p align="center"><img src="docs/img/lo.svg" width="260"></p>
<p align="center"><img src="https://raw.githubusercontent.com/cncf/landscape/master/hosted_logos/logging-operator.svg" width="260"></p>
<p align="center">

<a href="https://goreportcard.com/badge/github.com/kube-logging/logging-operator">
Expand Down
6 changes: 6 additions & 0 deletions charts/logging-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
18 changes: 18 additions & 0 deletions config/samples/fluentdconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: Namespace
metadata:
name: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Logging
metadata:
name: fluentd-config
spec:
controlNamespace: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: FluentdConfig
metadata:
name: fluentd-config
namespace: logging
spec: {}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ spec:
loggingRef: infra
inputTail:
storage.type: filesystem
forwardOptions:
Workers: 0
syslogng_output:
Workers: 0
positiondb:
hostPath:
path: ""
Expand All @@ -63,6 +59,9 @@ spec:
path: ""
network:
connectTimeout: 2
metrics: {}
image:
tag: 2.1.8-debug
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: LoggingRoute
Expand Down
97 changes: 84 additions & 13 deletions controllers/logging/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -52,23 +54,26 @@ import (
)

// NewLoggingReconciler returns a new LoggingReconciler instance
func NewLoggingReconciler(client client.Client, log logr.Logger) *LoggingReconciler {
func NewLoggingReconciler(client client.Client, eventRecorder record.EventRecorder, log logr.Logger) *LoggingReconciler {
return &LoggingReconciler{
Client: client,
Log: log,
Client: client,
EventRecorder: eventRecorder,
Log: log,
}
}

// LoggingReconciler reconciles a Logging object
type LoggingReconciler struct {
client.Client
Log logr.Logger
EventRecorder record.EventRecorder
Log logr.Logger
}

// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings;fluentbitagents;flows;clusterflows;outputs;clusteroutputs;nodeagents;fluentdconfigs;syslogngconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/status;fluentbitagents/status;flows/status;clusterflows/status;outputs/status;clusteroutputs/status;nodeagents/status;fluentdconfigs/status;syslogngconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows;syslogngclusterflows;syslogngoutputs;syslogngclusteroutputs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows/status;syslogngclusterflows/status;syslogngoutputs/status;syslogngclusteroutputs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -128,7 +133,8 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, errors.WrapIfWithDetails(err, "failed to get logging resources", "logging", logging)
}

r.dynamicDefaults(ctx, log, loggingResources.GetSyslogNGSpec())
_, syslogNGSPec := loggingResources.GetSyslogNGSpec()
r.dynamicDefaults(ctx, log, syslogNGSPec)

// metrics
defer func() {
Expand Down Expand Up @@ -177,7 +183,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

var loggingDataProvider loggingdataprovider.LoggingDataProvider

fluentdSpec := loggingResources.GetFluentdSpec()
fluentdExternal, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
fluentdConfig, secretList, err := r.clusterConfigurationFluentd(loggingResources)
if err != nil {
Expand All @@ -188,12 +194,12 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", fluentdConfig)

reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, fluentdExternal, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)
}

syslogNGSpec := loggingResources.GetSyslogNGSpec()
syslogNGExternal, syslogNGSpec := loggingResources.GetSyslogNGSpec()
if syslogNGSpec != nil {
syslogNGConfig, secretList, err := r.clusterConfigurationSyslogNG(loggingResources)
if err != nil {
Expand All @@ -204,9 +210,9 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", syslogNGConfig)

reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGExternal, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging)
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging, syslogNGExternal)
}

switch len(loggingResources.Fluentbits) {
Expand Down Expand Up @@ -261,7 +267,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Error(errors.New("nodeagent definition conflict"), problem)
}
}
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)).Reconcile)
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)).Reconcile)
}

for _, rec := range reconcilers {
Expand All @@ -275,9 +281,73 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

if shouldReturn, err := r.fluentdConfigFinalizer(ctx, &logging, fluentdExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

if shouldReturn, err := r.syslogNGConfigFinalizer(ctx, &logging, syslogNGExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *LoggingReconciler) fluentdConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalFluentd *loggingv1beta1.FluentdConfig) (bool, error) {
fluentdConfigFinalizer := "fluentdconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalFluentd != nil && !controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) {
r.Log.Info("adding fluentdconfig finalizer")
controllerutil.AddFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalFluentd != nil {
msg := fmt.Sprintf("refused to delete logging resource while fluentdConfig %s exists", client.ObjectKeyFromObject(externalFluentd))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) && externalFluentd == nil {
r.Log.Info("removing fluentdconfig finalizer")
controllerutil.RemoveFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) syslogNGConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalSyslogNG *loggingv1beta1.SyslogNGConfig) (bool, error) {
syslogNGConfigFinalizer := "syslogngconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalSyslogNG != nil && !controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) {
r.Log.Info("adding syslogngconfig finalizer")
controllerutil.AddFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalSyslogNG != nil {
msg := fmt.Sprintf("refused to delete logging resource while syslogNGConfig %s exists", client.ObjectKeyFromObject(externalSyslogNG))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) && externalSyslogNG == nil {
r.Log.Info("removing syslogngconfig finalizer")
controllerutil.RemoveFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) dynamicDefaults(ctx context.Context, log logr.Logger, syslogNGSpec *v1beta1.SyslogNGSpec) {
nodes := corev1.NodeList{}
if err := r.Client.List(ctx, &nodes); err != nil {
Expand Down Expand Up @@ -372,6 +442,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Path: syslogng.OutputSecretPath,
}

_, syslogngSpec := resources.GetSyslogNGSpec()
in := syslogngconfig.Input{
Name: resources.Logging.Name,
Namespace: resources.Logging.Namespace,
Expand All @@ -381,7 +452,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Flows: resources.SyslogNG.Flows,
SecretLoaderFactory: &slf,
SourcePort: syslogng.ServicePort,
SyslogNGSpec: resources.GetSyslogNGSpec(),
SyslogNGSpec: syslogngSpec,
}
var b strings.Builder
if err := syslogngconfig.RenderConfigInto(in, &b); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions controllers/logging/logging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

controllers "github.com/kube-logging/logging-operator/controllers/logging"
"github.com/kube-logging/logging-operator/pkg/resources/fluentd"
"github.com/kube-logging/logging-operator/pkg/resources/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
Expand Down Expand Up @@ -1410,7 +1411,7 @@ func beforeEachWithError(t *testing.T, errors chan<- error) func() {
})
g.Expect(err).NotTo(gomega.HaveOccurred())

flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("controllers").WithName("Flow"))
flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("controllers").WithName("Flow"))

var stopped bool
wrappedReconciler := duplicateRequest(t, flowReconciler, &stopped, errors)
Expand Down
7 changes: 7 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Contents

This folder contains two major class of documents:
- technical documentation snippets of various features around the operator
- generated documentation from code under the [configuration](./configuration) folder

End user documentation is available under https://kube-logging.dev
1 change: 0 additions & 1 deletion docs/Readme.md

This file was deleted.

11 changes: 0 additions & 11 deletions docs/examples/logging_logging_default_route.yaml

This file was deleted.

55 changes: 55 additions & 0 deletions docs/fluentbit-flow-control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
## Flow control with durability in a multi tenant setup

Resources:
- https://docs.fluentbit.io/manual/administration/backpressure
- https://docs.fluentbit.io/manual/administration/buffering-and-storage
- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging
- https://docs.fluentbit.io/manual/administration/monitoring
- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal

### Context

Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags).

### Durability

According to the referenced resources we need `storage.type filesystem` for *every input*
where we want to avoid losing data. If we just enable this option, there will be no limit
on how many data fluent-bit should keep on disk.

> Note: we also have to configure the position db to avoid fluent-bit
> reading the same files from the beginning after a restart

### Memory limit

The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit.
But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many
chunks it can read up and handle in memory at the same time.
Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only
affect the overall throughput.

### Disk usage limit

In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for
every *output* individually. This sounds good, but the problem with this option is that it doesn't
cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss,
so this option should be used with care.

### Backpressure

Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important
caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit.

Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up
on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that
tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our
observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs
to the faulty output is paused and when the output gets back online, the input is resumed immediately.

Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent
are not kept in memory, so other input/output pairs are not limited by the throughput.

In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded.

As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused,
data in log files that gets deleted by the container runtime during the output's downtime will get lost.
Loading
Loading