Skip to content

Commit

Permalink
Merge pull request #1888 from kube-logging/forward-port-release-4.10
Browse files Browse the repository at this point in the history
forward port release 4.10
  • Loading branch information
pepov authored Dec 17, 2024
2 parents 096a5a0 + 45110de commit 0a15baf
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -1585,6 +1587,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -2632,6 +2634,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down Expand Up @@ -12185,6 +12189,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4066,6 +4066,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -1582,6 +1584,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -2629,6 +2631,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down Expand Up @@ -12182,6 +12186,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4063,6 +4063,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_fluentbitagents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -1582,6 +1584,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_loggings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,8 @@ spec:
flush:
format: int32
type: integer
forceHotReloadAfterGrace:
type: boolean
forwardOptions:
properties:
Require_ack_response:
Expand Down Expand Up @@ -2629,6 +2631,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down Expand Up @@ -12182,6 +12186,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/logging.banzaicloud.io_nodeagents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4063,6 +4063,8 @@ spec:
items:
type: string
type: array
storage.pause_on_chunks_overlimit:
type: string
storage.type:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ metadata:
tenant: infra
spec:
loggingRef: infra
fluentd: {}
fluentd:
metrics: {}
controlNamespace: infra
---
apiVersion: logging.banzaicloud.io/v1beta1
Expand Down Expand Up @@ -49,8 +50,12 @@ metadata:
name: infra
spec:
loggingRef: infra
# this is required to reload even if there are pending tasks in one of the queues
# requires grace to be set, which is 5 by default
forceHotReloadAfterGrace: true
inputTail:
storage.type: filesystem
storage.pause_on_chunks_overlimit: "off"
positiondb:
hostPath:
path: ""
Expand All @@ -59,9 +64,15 @@ spec:
path: ""
network:
connectTimeout: 2
keepaliveMaxRecycle: 20
metrics: {}
bufferStorage:
storage.max_chunks_up: 10
forwardOptions:
storage.total_limit_size: 50MB
image:
tag: 2.2.2-debug
tag: 3.1.10-debug
configHotReload: {}
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: LoggingRoute
Expand Down
2 changes: 2 additions & 0 deletions controllers/logging/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type LoggingReconciler struct {
func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("logging", req.Name)

log.V(1).Info("reconciling")

var logging loggingv1beta1.Logging
if err := r.Client.Get(ctx, req.NamespacedName, &logging); err != nil {
// If object is not found, return without error.
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/crds/v1beta1/fluentbit_types.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ Set the flush time in seconds.nanoseconds. The engine loop uses a Flush timeout

Default: 1

### forceHotReloadAfterGrace (bool, optional) {#fluentbitspec-forcehotreloadaftergrace}

HotReload pauses all inputs and waits until they finish. In certain situations this is unacceptable, for example if an output is down for a longer time. An undocumented option called "Hot_Reload.Ensure_Thread_Safety Off" can be used at the [SERVICE] config to force hotreload after the grace period. Please note that it might result in a SIGSEGV, but worst case kubelet will restart the container. See https://github.com/fluent/fluent-bit/pull/7509


### forwardOptions (*ForwardOptions, optional) {#fluentbitspec-forwardoptions}


Expand Down Expand Up @@ -557,6 +562,12 @@ When a monitored file reach it buffer capacity due to a very long line (Buffer_M

Default: Off

### storage.pause_on_chunks_overlimit (string, optional) {#inputtail-storage.pause_on_chunks_overlimit}

Specifies whether to pause or drop data when the buffer is full. This helps to make sure we apply backpressure on the input if enabled, see https://docs.fluentbit.io/manual/administration/backpressure

Default: on

### storage.type (string, optional) {#inputtail-storage.type}

Specify the buffering mechanism to use. It can be memory or filesystem.
Expand Down
15 changes: 13 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"runtime/coverage"
"strings"
"syscall"
"time"

"emperror.dev/errors"
prometheusOperator "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
Expand Down Expand Up @@ -81,6 +82,7 @@ func main() {
var namespace string
var loggingRef string
var klogLevel int
var syncPeriod string

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
Expand All @@ -91,6 +93,7 @@ func main() {
flag.BoolVar(&enableprofile, "pprof", false, "Enable pprof")
flag.StringVar(&namespace, "watch-namespace", "", "Namespace to filter the list of watched objects")
flag.StringVar(&loggingRef, "watch-logging-name", "", "Logging resource name to optionally filter the list of watched objects based on which logging they belong to by checking the app.kubernetes.io/managed-by label")
flag.StringVar(&syncPeriod, "sync-period", "", "SyncPeriod determines the minimum frequency at which watched resources are reconciled. Defaults to 10 hours. Parsed using time.ParseDuration.")
flag.Parse()

ctx := context.Background()
Expand Down Expand Up @@ -142,7 +145,7 @@ func main() {
mgrOptions.WebhookServer = webhookServer
}

customMgrOptions, err := setupCustomCache(&mgrOptions, namespace, loggingRef)
customMgrOptions, err := setupCustomCache(&mgrOptions, syncPeriod, namespace, loggingRef)
if err != nil {
setupLog.Error(err, "unable to set up custom cache settings")
os.Exit(1)
Expand Down Expand Up @@ -285,7 +288,15 @@ func detectContainerRuntime(ctx context.Context, c client.Reader) error {
return nil
}

func setupCustomCache(mgrOptions *ctrl.Options, namespace string, loggingRef string) (*ctrl.Options, error) {
func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace string, loggingRef string) (*ctrl.Options, error) {
if syncPeriod != "" {
duration, err := time.ParseDuration(syncPeriod)
if err != nil {
return mgrOptions, err
}
mgrOptions.Cache.SyncPeriod = &duration
}

if namespace == "" && loggingRef == "" {
return mgrOptions, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/fluentbit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var fluentBitConfigTemplate = `
[SERVICE]
Flush {{ .Flush }}
Grace {{ .Grace }}
{{- if .ForceHotReloadAfterGrace }}
Hot_Reload.Ensure_Thread_Safety off
{{- end }}
Daemon Off
Log_Level {{ .LogLevel }}
Parsers_File {{ .DefaultParsers }}
Expand Down
52 changes: 27 additions & 25 deletions pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,24 @@ type fluentBitConfig struct {
Port int32
Path string
}
Flush int32
Grace int32
LogLevel string
CoroStackSize int32
Output map[string]string
Input fluentbitInputConfig
Inputs []fluentbitInputConfigWithTenant
DisableKubernetesFilter bool
KubernetesFilter map[string]string
AwsFilter map[string]string
BufferStorage map[string]string
FilterModify []v1beta1.FilterModify
FluentForwardOutput *fluentForwardOutputConfig
SyslogNGOutput *syslogNGOutputConfig
DefaultParsers string
CustomParsers string
HealthCheck *v1beta1.HealthCheck
Flush int32
Grace int32
LogLevel string
CoroStackSize int32
Output map[string]string
ForceHotReloadAfterGrace bool
Input fluentbitInputConfig
Inputs []fluentbitInputConfigWithTenant
DisableKubernetesFilter bool
KubernetesFilter map[string]string
AwsFilter map[string]string
BufferStorage map[string]string
FilterModify []v1beta1.FilterModify
FluentForwardOutput *fluentForwardOutputConfig
SyslogNGOutput *syslogNGOutputConfig
DefaultParsers string
CustomParsers string
HealthCheck *v1beta1.HealthCheck
}

type fluentForwardOutputConfig struct {
Expand Down Expand Up @@ -213,14 +214,15 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
}

input := fluentBitConfig{
Flush: r.fluentbitSpec.Flush,
Grace: r.fluentbitSpec.Grace,
LogLevel: r.fluentbitSpec.LogLevel,
CoroStackSize: r.fluentbitSpec.CoroStackSize,
Namespace: r.Logging.Spec.ControlNamespace,
DisableKubernetesFilter: disableKubernetesFilter,
FilterModify: r.fluentbitSpec.FilterModify,
HealthCheck: r.fluentbitSpec.HealthCheck,
Flush: r.fluentbitSpec.Flush,
Grace: r.fluentbitSpec.Grace,
ForceHotReloadAfterGrace: r.fluentbitSpec.ForceHotReloadAfterGrace,
LogLevel: r.fluentbitSpec.LogLevel,
CoroStackSize: r.fluentbitSpec.CoroStackSize,
Namespace: r.Logging.Spec.ControlNamespace,
DisableKubernetesFilter: disableKubernetesFilter,
FilterModify: r.fluentbitSpec.FilterModify,
HealthCheck: r.fluentbitSpec.HealthCheck,
}

input.DefaultParsers = fmt.Sprintf("%s/%s", StockConfigPath, "parsers.conf")
Expand Down
2 changes: 0 additions & 2 deletions pkg/resources/fluentbit/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *

tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name)
tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
// This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure
tenantValues["storage.pause_on_chunks_overlimit"] = "on"
input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{
Tenant: t.Name,
Values: tenantValues,
Expand Down
Loading

0 comments on commit 0a15baf

Please sign in to comment.