Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-4.5'
Browse files Browse the repository at this point in the history
Signed-off-by: Szilard Parrag <[email protected]>
  • Loading branch information
OverOrion committed Mar 4, 2024
2 parents ecb06ab + db85ac6 commit 0302e73
Show file tree
Hide file tree
Showing 22 changed files with 261 additions and 133 deletions.
6 changes: 6 additions & 0 deletions charts/logging-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
18 changes: 18 additions & 0 deletions config/samples/fluentdconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: Namespace
metadata:
name: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Logging
metadata:
name: fluentd-config
spec:
controlNamespace: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: FluentdConfig
metadata:
name: fluentd-config
namespace: logging
spec: {}
97 changes: 84 additions & 13 deletions controllers/logging/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -52,23 +54,26 @@ import (
)

// NewLoggingReconciler returns a new LoggingReconciler instance
func NewLoggingReconciler(client client.Client, log logr.Logger) *LoggingReconciler {
func NewLoggingReconciler(client client.Client, eventRecorder record.EventRecorder, log logr.Logger) *LoggingReconciler {
return &LoggingReconciler{
Client: client,
Log: log,
Client: client,
EventRecorder: eventRecorder,
Log: log,
}
}

// LoggingReconciler reconciles a Logging object
type LoggingReconciler struct {
client.Client
Log logr.Logger
EventRecorder record.EventRecorder
Log logr.Logger
}

// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings;fluentbitagents;flows;clusterflows;outputs;clusteroutputs;nodeagents;fluentdconfigs;syslogngconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/status;fluentbitagents/status;flows/status;clusterflows/status;outputs/status;clusteroutputs/status;nodeagents/status;fluentdconfigs/status;syslogngconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows;syslogngclusterflows;syslogngoutputs;syslogngclusteroutputs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows/status;syslogngclusterflows/status;syslogngoutputs/status;syslogngclusteroutputs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -128,7 +133,8 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, errors.WrapIfWithDetails(err, "failed to get logging resources", "logging", logging)
}

r.dynamicDefaults(ctx, log, loggingResources.GetSyslogNGSpec())
_, syslogNGSPec := loggingResources.GetSyslogNGSpec()
r.dynamicDefaults(ctx, log, syslogNGSPec)

// metrics
defer func() {
Expand Down Expand Up @@ -177,7 +183,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

var loggingDataProvider loggingdataprovider.LoggingDataProvider

fluentdSpec := loggingResources.GetFluentdSpec()
fluentdExternal, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
fluentdConfig, secretList, err := r.clusterConfigurationFluentd(loggingResources)
if err != nil {
Expand All @@ -188,12 +194,12 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", fluentdConfig)

reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, fluentdExternal, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)
}

syslogNGSpec := loggingResources.GetSyslogNGSpec()
syslogNGExternal, syslogNGSpec := loggingResources.GetSyslogNGSpec()
if syslogNGSpec != nil {
syslogNGConfig, secretList, err := r.clusterConfigurationSyslogNG(loggingResources)
if err != nil {
Expand All @@ -204,9 +210,9 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", syslogNGConfig)

reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGExternal, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging)
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging, syslogNGExternal)
}

switch len(loggingResources.Fluentbits) {
Expand Down Expand Up @@ -261,7 +267,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Error(errors.New("nodeagent definition conflict"), problem)
}
}
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)).Reconcile)
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)).Reconcile)
}

for _, rec := range reconcilers {
Expand All @@ -275,9 +281,73 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

if shouldReturn, err := r.fluentdConfigFinalizer(ctx, &logging, fluentdExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

if shouldReturn, err := r.syslogNGConfigFinalizer(ctx, &logging, syslogNGExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *LoggingReconciler) fluentdConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalFluentd *loggingv1beta1.FluentdConfig) (bool, error) {
fluentdConfigFinalizer := "fluentdconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalFluentd != nil && !controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) {
r.Log.Info("adding fluentdconfig finalizer")
controllerutil.AddFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalFluentd != nil {
msg := fmt.Sprintf("refused to delete logging resource while fluentdConfig %s exists", client.ObjectKeyFromObject(externalFluentd))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) && externalFluentd == nil {
r.Log.Info("removing fluentdconfig finalizer")
controllerutil.RemoveFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) syslogNGConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalSyslogNG *loggingv1beta1.SyslogNGConfig) (bool, error) {
syslogNGConfigFinalizer := "syslogngconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalSyslogNG != nil && !controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) {
r.Log.Info("adding syslogngconfig finalizer")
controllerutil.AddFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalSyslogNG != nil {
msg := fmt.Sprintf("refused to delete logging resource while syslogNGConfig %s exists", client.ObjectKeyFromObject(externalSyslogNG))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) && externalSyslogNG == nil {
r.Log.Info("removing syslogngconfig finalizer")
controllerutil.RemoveFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) dynamicDefaults(ctx context.Context, log logr.Logger, syslogNGSpec *v1beta1.SyslogNGSpec) {
nodes := corev1.NodeList{}
if err := r.Client.List(ctx, &nodes); err != nil {
Expand Down Expand Up @@ -372,6 +442,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Path: syslogng.OutputSecretPath,
}

_, syslogngSpec := resources.GetSyslogNGSpec()
in := syslogngconfig.Input{
Name: resources.Logging.Name,
Namespace: resources.Logging.Namespace,
Expand All @@ -381,7 +452,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Flows: resources.SyslogNG.Flows,
SecretLoaderFactory: &slf,
SourcePort: syslogng.ServicePort,
SyslogNGSpec: resources.GetSyslogNGSpec(),
SyslogNGSpec: syslogngSpec,
}
var b strings.Builder
if err := syslogngconfig.RenderConfigInto(in, &b); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions controllers/logging/logging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

controllers "github.com/kube-logging/logging-operator/controllers/logging"
"github.com/kube-logging/logging-operator/pkg/resources/fluentd"
"github.com/kube-logging/logging-operator/pkg/resources/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
Expand Down Expand Up @@ -1410,7 +1411,7 @@ func beforeEachWithError(t *testing.T, errors chan<- error) func() {
})
g.Expect(err).NotTo(gomega.HaveOccurred())

flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("controllers").WithName("Flow"))
flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("controllers").WithName("Flow"))

var stopped bool
wrappedReconciler := duplicateRequest(t, flowReconciler, &stopped, errors)
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func main() {
os.Exit(1)
}

loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("logging"))
loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("logging"))

if err := (&extensionsControllers.EventTailerReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -216,6 +216,7 @@ func main() {

// +kubebuilder:scaffold:builder
setupLog.Info("starting manager")

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
return nil, nil, errs
}

fluentdSpec := loggingResources.GetFluentdSpec()
_, fluentdSpec := loggingResources.GetFluentd()

if fluentdSpec != nil {
fluentbitTargetHost := r.fluentbitSpec.TargetHost
Expand Down Expand Up @@ -369,7 +369,8 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
}
}

if loggingResources.GetSyslogNGSpec() != nil {
_, syslogNGSPec := loggingResources.GetSyslogNGSpec()
if syslogNGSPec != nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
input.SyslogNGOutput.Host = aggregatorEndpoint(r.Logging, syslogng.ServiceName)
input.SyslogNGOutput.Port = syslogng.ServicePort
Expand Down
5 changes: 3 additions & 2 deletions pkg/resources/fluentbit/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
continue
}

if loggingResources.GetFluentdSpec() != nil {
_, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
if input.FluentForwardOutput == nil {
input.FluentForwardOutput = &fluentForwardOutputConfig{}
}
Expand All @@ -104,7 +105,7 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
})
} else if loggingResources.GetSyslogNGSpec() != nil {
} else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil {
if input.SyslogNGOutput == nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
}
Expand Down
16 changes: 5 additions & 11 deletions pkg/resources/fluentd/appconfigmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,11 @@ func (r *Reconciler) newCheckPod(hashKey string, fluentdSpec v1beta1.FluentdSpec
Tolerations: fluentdSpec.Tolerations,
Affinity: fluentdSpec.Affinity,
PriorityClassName: fluentdSpec.PodPriorityClassName,
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: fluentdSpec.Security.PodSecurityContext.RunAsNonRoot,
FSGroup: fluentdSpec.Security.PodSecurityContext.FSGroup,
RunAsUser: fluentdSpec.Security.PodSecurityContext.RunAsUser,
RunAsGroup: fluentdSpec.Security.PodSecurityContext.RunAsGroup,
SeccompProfile: fluentdSpec.Security.PodSecurityContext.SeccompProfile,
},
Volumes: volumes,
ImagePullSecrets: fluentdSpec.Image.ImagePullSecrets,
InitContainers: initContainer,
Containers: container,
SecurityContext: fluentdSpec.Security.PodSecurityContext,
Volumes: volumes,
ImagePullSecrets: fluentdSpec.Image.ImagePullSecrets,
InitContainers: initContainer,
Containers: container,
},
}
if fluentdSpec.ConfigCheckAnnotations != nil {
Expand Down
18 changes: 10 additions & 8 deletions pkg/resources/fluentd/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,25 @@ import (
)

type DataProvider struct {
client client.Client
logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
client client.Client
logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
fluentdConfig *v1beta1.FluentdConfig
}

func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec) *DataProvider {
func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, fluentdConfig *v1beta1.FluentdConfig) *DataProvider {
return &DataProvider{
client: client,
logging: logging,
fluentdSpec: fluentdSpec,
client: client,
logging: logging,
fluentdSpec: fluentdSpec,
fluentdConfig: fluentdConfig,
}
}

func (p *DataProvider) GetReplicaCount(ctx context.Context) (*int32, error) {
if p.fluentdSpec != nil {
sts := &v1.StatefulSet{}
om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec)
om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec, p.fluentdConfig)
err := p.client.Get(ctx, types.NamespacedName{Namespace: om.Namespace, Name: om.Name}, sts)
if err != nil {
return nil, errors.WrapIf(client.IgnoreNotFound(err), "getting fluentd statefulset")
Expand Down
10 changes: 2 additions & 8 deletions pkg/resources/fluentd/drainjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,8 @@ func (r *Reconciler) drainerJobFor(pvc corev1.PersistentVolumeClaim, fluentdSpec
Affinity: fluentdSpec.Affinity,
TopologySpreadConstraints: fluentdSpec.TopologySpreadConstraints,
PriorityClassName: fluentdSpec.PodPriorityClassName,
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: fluentdSpec.Security.PodSecurityContext.RunAsNonRoot,
FSGroup: fluentdSpec.Security.PodSecurityContext.FSGroup,
RunAsUser: fluentdSpec.Security.PodSecurityContext.RunAsUser,
RunAsGroup: fluentdSpec.Security.PodSecurityContext.RunAsGroup,
SeccompProfile: fluentdSpec.Security.PodSecurityContext.SeccompProfile,
},
RestartPolicy: corev1.RestartPolicyNever,
SecurityContext: fluentdSpec.Security.PodSecurityContext,
RestartPolicy: corev1.RestartPolicyNever,
},
},
}
Expand Down
Loading

0 comments on commit 0302e73

Please sign in to comment.