diff --git a/controllers/logging/logging_controller.go b/controllers/logging/logging_controller.go index 217cb0e26..25e39e1d7 100644 --- a/controllers/logging/logging_controller.go +++ b/controllers/logging/logging_controller.go @@ -107,10 +107,6 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct log.Info("WARNING PormetheusRule is not supported in the cluster") } - if logging.Spec.WatchNamespaces != nil && logging.Spec.WatchNamespaceSelector != nil { - log.Info("WARNING watchNamespaceSelector will be omitted if configured along with watchNamespaces") - } - if err := logging.SetDefaults(); err != nil { return reconcile.Result{}, err } @@ -434,9 +430,28 @@ func SetupLoggingWithManager(mgr ctrl.Manager, logger logr.Logger) *ctrl.Builder return nil }) + // Trigger reconcile for all logging resources on namespace changes that define a watchNamespaceSelector + namespaceRequestMapper := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { + var loggingList loggingv1beta1.LoggingList + if err := mgr.GetCache().List(ctx, &loggingList); err != nil { + logger.Error(err, "failed to list logging resources") + return nil + } + requests := make([]reconcile.Request, 0) + for _, l := range loggingList.Items { + if l.Spec.WatchNamespaceSelector != nil { + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{ + Name: l.Name, + }}) + } + } + return requests + }) + builder := ctrl.NewControllerManagedBy(mgr). For(&loggingv1beta1.Logging{}). Owns(&corev1.Pod{}). + Watches(&corev1.Namespace{}, namespaceRequestMapper). Watches(&loggingv1beta1.ClusterOutput{}, requestMapper). Watches(&loggingv1beta1.ClusterFlow{}, requestMapper). Watches(&loggingv1beta1.Output{}, requestMapper). diff --git a/controllers/logging/logging_controller_test.go b/controllers/logging/logging_controller_test.go index dc3ab4804..e6b1a9ba7 100644 --- a/controllers/logging/logging_controller_test.go +++ b/controllers/logging/logging_controller_test.go @@ -42,6 +42,7 @@ import ( controllers "github.com/kube-logging/logging-operator/controllers/logging" "github.com/kube-logging/logging-operator/pkg/resources/fluentd" + "github.com/kube-logging/logging-operator/pkg/resources/model" "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output" ) @@ -1160,6 +1161,153 @@ func TestFlowWithDanglingLocalAndGlobalOutputRefs(t *testing.T) { }, timeout).Should(gomega.BeTrue()) } +func TestWatchNamespaces(t *testing.T) { + g := gomega.NewGomegaWithT(t) + defer beforeEach(t)() + + defer ensureCreated(t, &corev1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-bylabel-1", + Labels: map[string]string{ + "bylabel": "test1", + }, + }, + })() + defer ensureCreated(t, &corev1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-bylabel-2", + Labels: map[string]string{ + "bylabel": "test2", + }, + }, + })() + + cases := []struct { + name string + logging *v1beta1.Logging + expectedResult func() []string + expectError bool + }{ + { + name: "full list", + logging: &v1beta1.Logging{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-" + uuid.New()[:8], + }, + Spec: v1beta1.LoggingSpec{ + WatchNamespaces: []string{}, + WatchNamespaceSelector: nil, + }, + }, + expectedResult: func() []string { + allNamespaces := &corev1.NamespaceList{} + err := mgr.GetClient().List(context.TODO(), allNamespaces) + if err != nil { + t.Fatalf("unexpected error when getting namespaces %s", err) + } + items := []string{} + for _, i := range allNamespaces.Items { + items = append(items, i.Name) + } + return items + }, + }, + { + name: "explicit list", + logging: &v1beta1.Logging{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-" + uuid.New()[:8], + }, + Spec: v1beta1.LoggingSpec{ + WatchNamespaces: []string{"test-explicit-1", "test-explicit-2"}, + WatchNamespaceSelector: nil, + }, + }, + expectedResult: func() []string { return []string{"test-explicit-1", "test-explicit-2"} }, + }, + { + name: "bylabel list", + logging: &v1beta1.Logging{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-" + uuid.New()[:8], + }, + Spec: v1beta1.LoggingSpec{ + WatchNamespaces: []string{}, + WatchNamespaceSelector: &v1.LabelSelector{ + MatchLabels: map[string]string{ + "bylabel": "test1", + }, + }, + }, + }, + expectedResult: func() []string { return []string{"test-bylabel-1"} }, + }, + { + name: "bylabel negative list (label exists but value should be different)", + logging: &v1beta1.Logging{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-" + uuid.New()[:8], + }, + Spec: v1beta1.LoggingSpec{ + WatchNamespaces: []string{}, + WatchNamespaceSelector: &v1.LabelSelector{ + MatchExpressions: []v1.LabelSelectorRequirement{ + { + Key: "bylabel", + Operator: v1.LabelSelectorOpExists, + }, + { + Key: "bylabel", + Operator: v1.LabelSelectorOpNotIn, + Values: []string{"test1"}, + }, + }, + }, + }, + }, + expectedResult: func() []string { return []string{"test-bylabel-2"} }, + }, + { + name: "merge two sets uniquely", + logging: &v1beta1.Logging{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-" + uuid.New()[:8], + }, + Spec: v1beta1.LoggingSpec{ + WatchNamespaces: []string{"a", "b", "c", "test-bylabel-1"}, + WatchNamespaceSelector: &v1.LabelSelector{ + MatchExpressions: []v1.LabelSelectorRequirement{ + { + Key: "bylabel", + Operator: v1.LabelSelectorOpExists, + }, + }, + }, + }, + }, + expectedResult: func() []string { return []string{"a", "b", "c", "test-bylabel-1", "test-bylabel-2"} }, + }, + } + + repo := model.NewLoggingResourceRepository(mgr.GetClient(), mgr.GetLogger()) + + for _, c := range cases { + if c.expectError { + _, err := repo.UniqueWatchNamespaces(context.TODO(), c.logging) + if c.expectError && err == nil { + t.Fatalf("expected error for test case %s", c.name) + } + continue + } + + g.Eventually(func() ([]string, error) { + return repo.UniqueWatchNamespaces(context.TODO(), c.logging) + }, timeout).Should(gomega.ConsistOf( + c.expectedResult(), + )) + } +} + func beforeEach(t *testing.T) func() { return beforeEachWithError(t, nil) } diff --git a/pkg/resources/model/repository.go b/pkg/resources/model/repository.go index 344dbfd75..74f5c6f79 100644 --- a/pkg/resources/model/repository.go +++ b/pkg/resources/model/repository.go @@ -63,33 +63,13 @@ func (r LoggingResourceRepository) LoggingResourcesFor(ctx context.Context, logg res.Fluentbits, err = r.FluentbitsFor(ctx, logging) errs = errors.Append(errs, err) - watchNamespaces := logging.Spec.WatchNamespaces - nsLabelSelector := logging.Spec.WatchNamespaceSelector - if len(watchNamespaces) == 0 { - var nsList corev1.NamespaceList - var nsListOptions = &client.ListOptions{} - if nsLabelSelector != nil { - selector, err := metav1.LabelSelectorAsSelector(nsLabelSelector) - if err != nil { - errs = errors.Append(errs, errors.WrapIf(err, "error in watchNamespaceSelector")) - return - } - nsListOptions = &client.ListOptions{ - LabelSelector: selector, - } - } - if err := r.Client.List(ctx, &nsList, nsListOptions); err != nil { - errs = errors.Append(errs, errors.WrapIf(err, "listing namespaces")) - return - } - - for _, i := range nsList.Items { - watchNamespaces = append(watchNamespaces, i.Name) - } + uniqueWatchNamespaces, err := r.UniqueWatchNamespaces(ctx, &logging) + if err != nil { + errs = errors.Append(errs, err) + return } - sort.Strings(watchNamespaces) - for _, ns := range watchNamespaces { + for _, ns := range uniqueWatchNamespaces { { flows, err := r.FlowsInNamespaceFor(ctx, ns, logging) res.Fluentd.Flows = append(res.Fluentd.Flows, flows...) @@ -118,6 +98,41 @@ func (r LoggingResourceRepository) LoggingResourcesFor(ctx context.Context, logg return } +func (r LoggingResourceRepository) UniqueWatchNamespaces(ctx context.Context, logging *v1beta1.Logging) ([]string, error) { + watchNamespaces := logging.Spec.WatchNamespaces + nsLabelSelector := logging.Spec.WatchNamespaceSelector + if len(watchNamespaces) == 0 || nsLabelSelector != nil { + var nsList corev1.NamespaceList + var nsListOptions = &client.ListOptions{} + if nsLabelSelector != nil { + selector, err := metav1.LabelSelectorAsSelector(nsLabelSelector) + if err != nil { + return nil, errors.WrapIf(err, "error in watchNamespaceSelector") + } + nsListOptions = &client.ListOptions{ + LabelSelector: selector, + } + } + if err := r.Client.List(ctx, &nsList, nsListOptions); err != nil { + return nil, errors.WrapIf(err, "listing namespaces for watchNamespaceSelector") + } + for _, i := range nsList.Items { + watchNamespaces = append(watchNamespaces, i.Name) + } + } + uniqueWatchNamespaces := []string{} + var previousNamespace string + sort.Strings(watchNamespaces) + + for _, n := range watchNamespaces { + if n != previousNamespace { + uniqueWatchNamespaces = append(uniqueWatchNamespaces, n) + } + previousNamespace = n + } + return uniqueWatchNamespaces, nil +} + func (r LoggingResourceRepository) ClusterFlowsFor(ctx context.Context, logging v1beta1.Logging) ([]v1beta1.ClusterFlow, error) { var list v1beta1.ClusterFlowList if err := r.Client.List(ctx, &list, clusterResourceListOpts(logging)...); err != nil {