Skip to content

Commit

Permalink
Merge pull request #1682 from kube-logging/backpressure-isolate
Browse files Browse the repository at this point in the history
Isolate input sources to avoid backpressure affecting other tenants
  • Loading branch information
OverOrion authored Mar 4, 2024
2 parents 6449512 + d3199d8 commit ecb06ab
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ spec:
loggingRef: infra
inputTail:
storage.type: filesystem
forwardOptions:
Workers: 0
syslogng_output:
Workers: 0
positiondb:
hostPath:
path: ""
Expand All @@ -63,6 +59,9 @@ spec:
path: ""
network:
connectTimeout: 2
metrics: {}
image:
tag: 2.1.8-debug
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: LoggingRoute
Expand Down
55 changes: 55 additions & 0 deletions docs/fluentbit-flow-control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
## Flow control with durability in a multi tenant setup

Resources:
- https://docs.fluentbit.io/manual/administration/backpressure
- https://docs.fluentbit.io/manual/administration/buffering-and-storage
- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging
- https://docs.fluentbit.io/manual/administration/monitoring
- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal

### Context

Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags).

### Durability

According to the referenced resources we need `storage.type filesystem` for *every input*
where we want to avoid losing data. If we just enable this option, there will be no limit
on how many data fluent-bit should keep on disk.

> Note: we also have to configure the position db to avoid fluent-bit
> reading the same files from the beginning after a restart
### Memory limit

The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit.
But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many
chunks it can read up and handle in memory at the same time.
Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only
affect the overall throughput.

### Disk usage limit

In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for
every *output* individually. This sounds good, but the problem with this option is that it doesn't
cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss,
so this option should be used with care.

### Backpressure

Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important
caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit.

Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up
on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that
tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our
observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs
to the faulty output is paused and when the output gets back online, the input is resumed immediately.

Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent
are not kept in memory, so other input/output pairs are not limited by the throughput.

In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded.

As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused,
data in log files that gets deleted by the container runtime during the output's downtime will get lost.
55 changes: 30 additions & 25 deletions pkg/resources/fluentbit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,14 @@ var fluentBitConfigTemplate = `
{{- end }}
{{- end }}
[INPUT]
Name tail
{{- range $key, $value := .Input.Values }}
{{- if $value }}
{{ $key }} {{$value}}
{{- end }}
{{- end }}
{{- range $id, $v := .Input.ParserN }}
{{- if $v }}
Parse_{{ $id}} {{$v}}
{{- end }}
{{- end }}
{{- if .Input.MultilineParser }}
multiline.parser {{- range $i, $v := .Input.MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
{{- end }}
{{- if .Inputs }}
{{- range $input := .Inputs }}
# Tenant: {{ $input.Tenant }}
{{- template "input" $input }}
{{- end }}
{{- else }}
{{- template "input" .Input }}
{{- end }}
{{- if not .DisableKubernetesFilter }}
[FILTER]
Expand Down Expand Up @@ -111,11 +104,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name forward
{{- if $target.AllNamespaces }}
Match *
{{- else }}
Match_Regex {{ $target.NamespaceRegex }}
{{- end }}
Match {{ $target.Match }}
{{- if $out.Upstream.Enabled }}
Upstream {{ $out.Upstream.Config.Path }}
{{- else }}
Expand Down Expand Up @@ -149,11 +138,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name tcp
{{- if $target.AllNamespaces }}
Match *
{{- else }}
Match_Regex {{ $target.NamespaceRegex }}
{{- end }}
Match {{ $target.Match }}
Host {{ $target.Host }}
Port {{ $target.Port }}
Format json_lines
Expand Down Expand Up @@ -203,6 +188,26 @@ var fluentbitNetworkTemplate = `
{{- end }}
`

var fluentbitInputTemplate = `
{{- define "input" }}
[INPUT]
Name tail
{{- range $key, $value := .Values }}
{{- if $value }}
{{ $key }} {{$value}}
{{- end }}
{{- end }}
{{- range $id, $v := .ParserN }}
{{- if $v }}
Parse_{{ $id}} {{$v}}
{{- end }}
{{- end }}
{{- if .MultilineParser }}
multiline.parser {{- range $i, $v := .MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
{{- end }}
{{- end }}
`

var upstreamConfigTemplate = `
[UPSTREAM]
Name {{ .Config.Name }}
Expand Down
29 changes: 22 additions & 7 deletions pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type fluentbitInputConfig struct {
MultilineParser []string
}

type fluentbitInputConfigWithTenant struct {
Tenant string
Values map[string]string
ParserN []string
MultilineParser []string
}

type upstreamNode struct {
Name string
Host string
Expand All @@ -63,6 +70,7 @@ type fluentBitConfig struct {
CoroStackSize int32
Output map[string]string
Input fluentbitInputConfig
Inputs []fluentbitInputConfigWithTenant
DisableKubernetesFilter bool
KubernetesFilter map[string]string
AwsFilter map[string]string
Expand All @@ -86,8 +94,8 @@ type fluentForwardOutputConfig struct {
}

type forwardTargetConfig struct {
AllNamespaces bool
NamespaceRegex string
Match string
Host string
Port int32
}
Expand Down Expand Up @@ -372,22 +380,25 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
for _, a := range loggingResources.LoggingRoutes {
tenants = append(tenants, a.Status.Tenants...)
}
if err := r.configureInputsForTenants(tenants, &input); err != nil {
return nil, nil, errors.WrapIf(err, "configuring inputs for target tenants")
}
if err := r.configureOutputsForTenants(ctx, tenants, &input); err != nil {
return nil, nil, errors.WrapIf(err, "configuring outputs for target tenants")
}
} else {
// compatibility with existing configuration
if input.FluentForwardOutput != nil {
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
AllNamespaces: true,
Host: input.FluentForwardOutput.TargetHost,
Port: input.FluentForwardOutput.TargetPort,
Match: "*",
Host: input.FluentForwardOutput.TargetHost,
Port: input.FluentForwardOutput.TargetPort,
})
} else if input.SyslogNGOutput != nil {
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
AllNamespaces: true,
Host: input.SyslogNGOutput.Host,
Port: input.SyslogNGOutput.Port,
Match: "*",
Host: input.SyslogNGOutput.Host,
Port: input.SyslogNGOutput.Port,
})
}
}
Expand Down Expand Up @@ -454,6 +465,10 @@ func generateConfig(input fluentBitConfig) (string, error) {
if err != nil {
return "", errors.WrapIf(err, "parsing fluentbit network nested template")
}
tmpl, err = tmpl.Parse(fluentbitInputTemplate)
if err != nil {
return "", errors.WrapIf(err, "parsing fluentbit input nested template")
}
err = tmpl.Execute(output, input)
if err != nil {
return "", errors.WrapIf(err, "executing fluentbit config template")
Expand Down
65 changes: 52 additions & 13 deletions pkg/resources/fluentbit/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package fluentbit

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"strings"

"emperror.dev/errors"
"golang.org/x/exp/maps"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -80,11 +83,7 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client
func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v1beta1.Tenant, input *fluentBitConfig) error {
var errs error
for _, t := range tenants {
allNamespaces := len(t.Namespaces) == 0
namespaceRegex := `.`
if !allNamespaces {
namespaceRegex = fmt.Sprintf("^[^_]+_(%s)_", strings.Join(t.Namespaces, "|"))
}
match := fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
logging := &v1beta1.Logging{}
if err := r.resourceReconciler.Client.Get(ctx, types.NamespacedName{Name: t.Name}, logging); err != nil {
return errors.WrapIf(err, "getting logging resource")
Expand All @@ -101,24 +100,64 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
input.FluentForwardOutput = &fluentForwardOutputConfig{}
}
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
AllNamespaces: allNamespaces,
NamespaceRegex: namespaceRegex,
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
Match: match,
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
})
} else if loggingResources.GetSyslogNGSpec() != nil {
if input.SyslogNGOutput == nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
}
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
AllNamespaces: allNamespaces,
NamespaceRegex: namespaceRegex,
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
Port: syslogng.ServicePort,
Match: match,
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
Port: syslogng.ServicePort,
})
} else {
errs = errors.Append(errs, errors.Errorf("logging %s does not provide any aggregator configured", t.Name))
}
}
return errs
}

func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *fluentBitConfig) error {
var errs error
for _, t := range tenants {
allNamespaces := len(t.Namespaces) == 0
tenantValues := maps.Clone(input.Input.Values)
if !allNamespaces {
var paths []string
for _, n := range t.Namespaces {
paths = append(paths, fmt.Sprintf("/var/log/containers/*_%s_*.log", n))
}
tenantValues["Path"] = strings.Join(paths, ",")
} else {
tenantValues["Path"] = "/var/log/containers/*.log"
}

tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name)
tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
// This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure
tenantValues["storage.pause_on_chunks_overlimit"] = "on"
input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{
Tenant: t.Name,
Values: tenantValues,
ParserN: input.Input.ParserN,
MultilineParser: input.Input.MultilineParser,
})
}
// the regex will work only if we cut the prefix off. fluentbit doesn't care about the content, just the length
input.KubernetesFilter["Kube_Tag_Prefix"] = `kubernetes.0000000000.var.log.containers.`
return errs
}

func hashFromTenantName(input string) string {
hasher := sha256.New()
hasher.Write([]byte(input))
hashBytes := hasher.Sum(nil)

// Convert the hash to a hex string
hashString := hex.EncodeToString(hashBytes)

return hashString[0:10]
}

0 comments on commit ecb06ab

Please sign in to comment.