From 73114ef3adee3a159ea5871041518996cb52bea2 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Tue, 27 Feb 2024 12:39:10 +0100 Subject: [PATCH] isolate input sources using fixed hash tag and backpressure Signed-off-by: Peter Wilcsinszky --- .../logging/tenant-infra-logging.yaml | 7 +- docs/fluentbit-flow-control.md | 55 +++++++++++++ pkg/resources/fluentbit/config.go | 55 +++++++------ pkg/resources/fluentbit/configsecret.go | 29 +++++-- pkg/resources/fluentbit/tenants.go | 78 +++++++++++++------ 5 files changed, 163 insertions(+), 61 deletions(-) create mode 100644 docs/fluentbit-flow-control.md diff --git a/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml b/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml index 80960770a..d56fce83c 100644 --- a/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml +++ b/config/samples/multitenant-routing/logging/tenant-infra-logging.yaml @@ -51,10 +51,6 @@ spec: loggingRef: infra inputTail: storage.type: filesystem - forwardOptions: - Workers: 0 - syslogng_output: - Workers: 0 positiondb: hostPath: path: "" @@ -63,6 +59,9 @@ spec: path: "" network: connectTimeout: 2 + metrics: {} + image: + tag: 2.1.8-debug --- apiVersion: logging.banzaicloud.io/v1beta1 kind: LoggingRoute diff --git a/docs/fluentbit-flow-control.md b/docs/fluentbit-flow-control.md new file mode 100644 index 000000000..25f14a47f --- /dev/null +++ b/docs/fluentbit-flow-control.md @@ -0,0 +1,55 @@ +## Flow control with durability in a multi tenant setup + +Resources: +- https://docs.fluentbit.io/manual/administration/backpressure +- https://docs.fluentbit.io/manual/administration/buffering-and-storage +- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging +- https://docs.fluentbit.io/manual/administration/monitoring +- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal + +### Context + +Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags). + +### Durability + +According to the referenced resources we need `storage.type filesystem` for *every input* +where we want to avoid losing data. If we just enable this option, there will be no limit +on how many data fluent-bit should keep on disk. + +> Note: we also have to configure the position db to avoid fluent-bit +> reading the same files from the beginning after a restart + +### Memory limit + +The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit. +But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many +chunks it can read up and handle in memory at the same time. +Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only +affect the overall throughput. + +### Disk usage limit + +In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for +every *output* individually. This sounds good, but the problem with this option is that it doesn't +cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss, +so this option should be used with care. + +### Backpressure + +Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important +caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit. + +Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up +on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that +tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our +observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs +to the faulty output is paused and when the output gets back online, the input is resumed immediately. + +Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent +are not kept in memory, so other input/output pairs are not limited by the throughput. + +In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded. + +As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused, +data in log files that gets deleted by the container runtime during the output's downtime will get lost. diff --git a/pkg/resources/fluentbit/config.go b/pkg/resources/fluentbit/config.go index 644052a18..398cb9e44 100644 --- a/pkg/resources/fluentbit/config.go +++ b/pkg/resources/fluentbit/config.go @@ -55,21 +55,14 @@ var fluentBitConfigTemplate = ` {{- end }} {{- end }} -[INPUT] - Name tail - {{- range $key, $value := .Input.Values }} - {{- if $value }} - {{ $key }} {{$value}} - {{- end }} - {{- end }} - {{- range $id, $v := .Input.ParserN }} - {{- if $v }} - Parse_{{ $id}} {{$v}} - {{- end }} - {{- end }} - {{- if .Input.MultilineParser }} - multiline.parser {{- range $i, $v := .Input.MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }} - {{- end }} +{{- if .Inputs }} +{{- range $input := .Inputs }} +# Tenant: {{ $input.Tenant }} +{{- template "input" $input }} +{{- end }} +{{- else }} +{{- template "input" .Input }} +{{- end }} {{- if not .DisableKubernetesFilter }} [FILTER] @@ -111,11 +104,7 @@ var fluentBitConfigTemplate = ` {{- range $target := $out.Targets }} [OUTPUT] Name forward - {{- if $target.AllNamespaces }} - Match * - {{- else }} - Match_Regex {{ $target.NamespaceRegex }} - {{- end }} + Match {{ $target.Match }} {{- if $out.Upstream.Enabled }} Upstream {{ $out.Upstream.Config.Path }} {{- else }} @@ -149,11 +138,7 @@ var fluentBitConfigTemplate = ` {{- range $target := $out.Targets }} [OUTPUT] Name tcp - {{- if $target.AllNamespaces }} - Match * - {{- else }} - Match_Regex {{ $target.NamespaceRegex }} - {{- end }} + Match {{ $target.Match }} Host {{ $target.Host }} Port {{ $target.Port }} Format json_lines @@ -203,6 +188,26 @@ var fluentbitNetworkTemplate = ` {{- end }} ` +var fluentbitInputTemplate = ` +{{- define "input" }} +[INPUT] + Name tail + {{- range $key, $value := .Values }} + {{- if $value }} + {{ $key }} {{$value}} + {{- end }} + {{- end }} + {{- range $id, $v := .ParserN }} + {{- if $v }} + Parse_{{ $id}} {{$v}} + {{- end }} + {{- end }} + {{- if .MultilineParser }} + multiline.parser {{- range $i, $v := .MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }} + {{- end }} +{{- end }} +` + var upstreamConfigTemplate = ` [UPSTREAM] Name {{ .Config.Name }} diff --git a/pkg/resources/fluentbit/configsecret.go b/pkg/resources/fluentbit/configsecret.go index 7a3862ab1..50f98a338 100644 --- a/pkg/resources/fluentbit/configsecret.go +++ b/pkg/resources/fluentbit/configsecret.go @@ -38,6 +38,13 @@ type fluentbitInputConfig struct { MultilineParser []string } +type fluentbitInputConfigWithTenant struct { + Tenant string + Values map[string]string + ParserN []string + MultilineParser []string +} + type upstreamNode struct { Name string Host string @@ -63,6 +70,7 @@ type fluentBitConfig struct { CoroStackSize int32 Output map[string]string Input fluentbitInputConfig + Inputs []fluentbitInputConfigWithTenant DisableKubernetesFilter bool KubernetesFilter map[string]string AwsFilter map[string]string @@ -86,8 +94,8 @@ type fluentForwardOutputConfig struct { } type forwardTargetConfig struct { - AllNamespaces bool NamespaceRegex string + Match string Host string Port int32 } @@ -373,6 +381,9 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er for _, a := range loggingResources.LoggingRoutes { tenants = append(tenants, a.Status.Tenants...) } + if err := r.configureInputsForTenants(tenants, &input); err != nil { + return nil, nil, errors.WrapIf(err, "configuring inputs for target tenants") + } if err := r.configureOutputsForTenants(ctx, tenants, &input); err != nil { return nil, nil, errors.WrapIf(err, "configuring outputs for target tenants") } @@ -380,15 +391,15 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er // compatibility with existing configuration if input.FluentForwardOutput != nil { input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{ - AllNamespaces: true, - Host: input.FluentForwardOutput.TargetHost, - Port: input.FluentForwardOutput.TargetPort, + Match: "*", + Host: input.FluentForwardOutput.TargetHost, + Port: input.FluentForwardOutput.TargetPort, }) } else if input.SyslogNGOutput != nil { input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{ - AllNamespaces: true, - Host: input.SyslogNGOutput.Host, - Port: input.SyslogNGOutput.Port, + Match: "*", + Host: input.SyslogNGOutput.Host, + Port: input.SyslogNGOutput.Port, }) } } @@ -455,6 +466,10 @@ func generateConfig(input fluentBitConfig) (string, error) { if err != nil { return "", errors.WrapIf(err, "parsing fluentbit network nested template") } + tmpl, err = tmpl.Parse(fluentbitInputTemplate) + if err != nil { + return "", errors.WrapIf(err, "parsing fluentbit input nested template") + } err = tmpl.Execute(output, input) if err != nil { return "", errors.WrapIf(err, "executing fluentbit config template") diff --git a/pkg/resources/fluentbit/tenants.go b/pkg/resources/fluentbit/tenants.go index 11c2d322a..55139d146 100644 --- a/pkg/resources/fluentbit/tenants.go +++ b/pkg/resources/fluentbit/tenants.go @@ -16,12 +16,14 @@ package fluentbit import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "sort" "strings" "emperror.dev/errors" - "golang.org/x/exp/slices" + "golang.org/x/exp/maps" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,19 +73,9 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client } } - sort.Slice(tenants, func(i, j int) bool { + sort.SliceStable(tenants, func(i, j int) bool { return tenants[i].Name < tenants[j].Name }) - // Make sure our tenant list is stable - slices.SortStableFunc(tenants, func(a, b Tenant) int { - if a.Name < b.Name { - return -1 - } - if a.Name == b.Name { - return 0 - } - return 1 - }) return tenants, nil } @@ -91,11 +83,7 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v1beta1.Tenant, input *fluentBitConfig) error { var errs error for _, t := range tenants { - allNamespaces := len(t.Namespaces) == 0 - namespaceRegex := `.` - if !allNamespaces { - namespaceRegex = fmt.Sprintf("^[^_]+_(%s)_", strings.Join(t.Namespaces, "|")) - } + match := fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name)) logging := &v1beta1.Logging{} if err := r.resourceReconciler.Client.Get(ctx, types.NamespacedName{Name: t.Name}, logging); err != nil { return errors.WrapIf(err, "getting logging resource") @@ -113,20 +101,18 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v input.FluentForwardOutput = &fluentForwardOutputConfig{} } input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{ - AllNamespaces: allNamespaces, - NamespaceRegex: namespaceRegex, - Host: aggregatorEndpoint(logging, fluentd.ServiceName), - Port: fluentd.ServicePort, + Match: match, + Host: aggregatorEndpoint(logging, fluentd.ServiceName), + Port: fluentd.ServicePort, }) } else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil { if input.SyslogNGOutput == nil { input.SyslogNGOutput = newSyslogNGOutputConfig() } input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{ - AllNamespaces: allNamespaces, - NamespaceRegex: namespaceRegex, - Host: aggregatorEndpoint(logging, syslogng.ServiceName), - Port: syslogng.ServicePort, + Match: match, + Host: aggregatorEndpoint(logging, syslogng.ServiceName), + Port: syslogng.ServicePort, }) } else { errs = errors.Append(errs, errors.Errorf("logging %s does not provide any aggregator configured", t.Name)) @@ -134,3 +120,45 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v } return errs } + +func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *fluentBitConfig) error { + var errs error + for _, t := range tenants { + allNamespaces := len(t.Namespaces) == 0 + tenantValues := maps.Clone(input.Input.Values) + if !allNamespaces { + var paths []string + for _, n := range t.Namespaces { + paths = append(paths, fmt.Sprintf("/var/log/containers/*_%s_*.log", n)) + } + tenantValues["Path"] = strings.Join(paths, ",") + } else { + tenantValues["Path"] = "/var/log/containers/*.log" + } + + tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name) + tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name)) + // This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure + tenantValues["storage.pause_on_chunks_overlimit"] = "on" + input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{ + Tenant: t.Name, + Values: tenantValues, + ParserN: input.Input.ParserN, + MultilineParser: input.Input.MultilineParser, + }) + } + // the regex will work only if we cut the prefix off. fluentbit doesn't care about the content, just the length + input.KubernetesFilter["Kube_Tag_Prefix"] = `kubernetes.0000000000.var.log.containers.` + return errs +} + +func hashFromTenantName(input string) string { + hasher := sha256.New() + hasher.Write([]byte(input)) + hashBytes := hasher.Sum(nil) + + // Convert the hash to a hex string + hashString := hex.EncodeToString(hashBytes) + + return hashString[0:10] +}