Skip to content

Commit

Permalink
watchNamespaces improvements (#1392)
Browse files Browse the repository at this point in the history
Make logging.spec.watchNamespaces (explicit static list) and logging.spec.watchNamespaceSelector (label based dynamic list) additive instead of exclusive for hopefully better semantics.
Add tests for better coverage (looking into reusing the same thing for a different use case as well)
Add namespace watches and trigger reconcile on loggings with watchNamespaceSelector defined

Signed-off-by: Peter Wilcsinszky <[email protected]>
  • Loading branch information
pepov authored Jul 25, 2023
1 parent ea9013a commit a98426a
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 29 deletions.
23 changes: 19 additions & 4 deletions controllers/logging/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Info("WARNING PormetheusRule is not supported in the cluster")
}

if logging.Spec.WatchNamespaces != nil && logging.Spec.WatchNamespaceSelector != nil {
log.Info("WARNING watchNamespaceSelector will be omitted if configured along with watchNamespaces")
}

if err := logging.SetDefaults(); err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -434,9 +430,28 @@ func SetupLoggingWithManager(mgr ctrl.Manager, logger logr.Logger) *ctrl.Builder
return nil
})

// Trigger reconcile for all logging resources on namespace changes that define a watchNamespaceSelector
namespaceRequestMapper := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
var loggingList loggingv1beta1.LoggingList
if err := mgr.GetCache().List(ctx, &loggingList); err != nil {
logger.Error(err, "failed to list logging resources")
return nil
}
requests := make([]reconcile.Request, 0)
for _, l := range loggingList.Items {
if l.Spec.WatchNamespaceSelector != nil {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Name: l.Name,
}})
}
}
return requests
})

builder := ctrl.NewControllerManagedBy(mgr).
For(&loggingv1beta1.Logging{}).
Owns(&corev1.Pod{}).
Watches(&corev1.Namespace{}, namespaceRequestMapper).
Watches(&loggingv1beta1.ClusterOutput{}, requestMapper).
Watches(&loggingv1beta1.ClusterFlow{}, requestMapper).
Watches(&loggingv1beta1.Output{}, requestMapper).
Expand Down
148 changes: 148 additions & 0 deletions controllers/logging/logging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

controllers "github.com/kube-logging/logging-operator/controllers/logging"
"github.com/kube-logging/logging-operator/pkg/resources/fluentd"
"github.com/kube-logging/logging-operator/pkg/resources/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output"
)
Expand Down Expand Up @@ -1160,6 +1161,153 @@ func TestFlowWithDanglingLocalAndGlobalOutputRefs(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
}

func TestWatchNamespaces(t *testing.T) {
g := gomega.NewGomegaWithT(t)
defer beforeEach(t)()

defer ensureCreated(t, &corev1.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: "test-bylabel-1",
Labels: map[string]string{
"bylabel": "test1",
},
},
})()
defer ensureCreated(t, &corev1.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: "test-bylabel-2",
Labels: map[string]string{
"bylabel": "test2",
},
},
})()

cases := []struct {
name string
logging *v1beta1.Logging
expectedResult func() []string
expectError bool
}{
{
name: "full list",
logging: &v1beta1.Logging{
ObjectMeta: v1.ObjectMeta{
Name: "test-" + uuid.New()[:8],
},
Spec: v1beta1.LoggingSpec{
WatchNamespaces: []string{},
WatchNamespaceSelector: nil,
},
},
expectedResult: func() []string {
allNamespaces := &corev1.NamespaceList{}
err := mgr.GetClient().List(context.TODO(), allNamespaces)
if err != nil {
t.Fatalf("unexpected error when getting namespaces %s", err)
}
items := []string{}
for _, i := range allNamespaces.Items {
items = append(items, i.Name)
}
return items
},
},
{
name: "explicit list",
logging: &v1beta1.Logging{
ObjectMeta: v1.ObjectMeta{
Name: "test-" + uuid.New()[:8],
},
Spec: v1beta1.LoggingSpec{
WatchNamespaces: []string{"test-explicit-1", "test-explicit-2"},
WatchNamespaceSelector: nil,
},
},
expectedResult: func() []string { return []string{"test-explicit-1", "test-explicit-2"} },
},
{
name: "bylabel list",
logging: &v1beta1.Logging{
ObjectMeta: v1.ObjectMeta{
Name: "test-" + uuid.New()[:8],
},
Spec: v1beta1.LoggingSpec{
WatchNamespaces: []string{},
WatchNamespaceSelector: &v1.LabelSelector{
MatchLabels: map[string]string{
"bylabel": "test1",
},
},
},
},
expectedResult: func() []string { return []string{"test-bylabel-1"} },
},
{
name: "bylabel negative list (label exists but value should be different)",
logging: &v1beta1.Logging{
ObjectMeta: v1.ObjectMeta{
Name: "test-" + uuid.New()[:8],
},
Spec: v1beta1.LoggingSpec{
WatchNamespaces: []string{},
WatchNamespaceSelector: &v1.LabelSelector{
MatchExpressions: []v1.LabelSelectorRequirement{
{
Key: "bylabel",
Operator: v1.LabelSelectorOpExists,
},
{
Key: "bylabel",
Operator: v1.LabelSelectorOpNotIn,
Values: []string{"test1"},
},
},
},
},
},
expectedResult: func() []string { return []string{"test-bylabel-2"} },
},
{
name: "merge two sets uniquely",
logging: &v1beta1.Logging{
ObjectMeta: v1.ObjectMeta{
Name: "test-" + uuid.New()[:8],
},
Spec: v1beta1.LoggingSpec{
WatchNamespaces: []string{"a", "b", "c", "test-bylabel-1"},
WatchNamespaceSelector: &v1.LabelSelector{
MatchExpressions: []v1.LabelSelectorRequirement{
{
Key: "bylabel",
Operator: v1.LabelSelectorOpExists,
},
},
},
},
},
expectedResult: func() []string { return []string{"a", "b", "c", "test-bylabel-1", "test-bylabel-2"} },
},
}

repo := model.NewLoggingResourceRepository(mgr.GetClient(), mgr.GetLogger())

for _, c := range cases {
if c.expectError {
_, err := repo.UniqueWatchNamespaces(context.TODO(), c.logging)
if c.expectError && err == nil {
t.Fatalf("expected error for test case %s", c.name)
}
continue
}

g.Eventually(func() ([]string, error) {
return repo.UniqueWatchNamespaces(context.TODO(), c.logging)
}, timeout).Should(gomega.ConsistOf(
c.expectedResult(),
))
}
}

func beforeEach(t *testing.T) func() {
return beforeEachWithError(t, nil)
}
Expand Down
65 changes: 40 additions & 25 deletions pkg/resources/model/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,13 @@ func (r LoggingResourceRepository) LoggingResourcesFor(ctx context.Context, logg
res.Fluentbits, err = r.FluentbitsFor(ctx, logging)
errs = errors.Append(errs, err)

watchNamespaces := logging.Spec.WatchNamespaces
nsLabelSelector := logging.Spec.WatchNamespaceSelector
if len(watchNamespaces) == 0 {
var nsList corev1.NamespaceList
var nsListOptions = &client.ListOptions{}
if nsLabelSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(nsLabelSelector)
if err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "error in watchNamespaceSelector"))
return
}
nsListOptions = &client.ListOptions{
LabelSelector: selector,
}
}
if err := r.Client.List(ctx, &nsList, nsListOptions); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "listing namespaces"))
return
}

for _, i := range nsList.Items {
watchNamespaces = append(watchNamespaces, i.Name)
}
uniqueWatchNamespaces, err := r.UniqueWatchNamespaces(ctx, &logging)
if err != nil {
errs = errors.Append(errs, err)
return
}
sort.Strings(watchNamespaces)

for _, ns := range watchNamespaces {
for _, ns := range uniqueWatchNamespaces {
{
flows, err := r.FlowsInNamespaceFor(ctx, ns, logging)
res.Fluentd.Flows = append(res.Fluentd.Flows, flows...)
Expand Down Expand Up @@ -118,6 +98,41 @@ func (r LoggingResourceRepository) LoggingResourcesFor(ctx context.Context, logg
return
}

func (r LoggingResourceRepository) UniqueWatchNamespaces(ctx context.Context, logging *v1beta1.Logging) ([]string, error) {
watchNamespaces := logging.Spec.WatchNamespaces
nsLabelSelector := logging.Spec.WatchNamespaceSelector
if len(watchNamespaces) == 0 || nsLabelSelector != nil {
var nsList corev1.NamespaceList
var nsListOptions = &client.ListOptions{}
if nsLabelSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(nsLabelSelector)
if err != nil {
return nil, errors.WrapIf(err, "error in watchNamespaceSelector")
}
nsListOptions = &client.ListOptions{
LabelSelector: selector,
}
}
if err := r.Client.List(ctx, &nsList, nsListOptions); err != nil {
return nil, errors.WrapIf(err, "listing namespaces for watchNamespaceSelector")
}
for _, i := range nsList.Items {
watchNamespaces = append(watchNamespaces, i.Name)
}
}
uniqueWatchNamespaces := []string{}
var previousNamespace string
sort.Strings(watchNamespaces)

for _, n := range watchNamespaces {
if n != previousNamespace {
uniqueWatchNamespaces = append(uniqueWatchNamespaces, n)
}
previousNamespace = n
}
return uniqueWatchNamespaces, nil
}

func (r LoggingResourceRepository) ClusterFlowsFor(ctx context.Context, logging v1beta1.Logging) ([]v1beta1.ClusterFlow, error) {
var list v1beta1.ClusterFlowList
if err := r.Client.List(ctx, &list, clusterResourceListOpts(logging)...); err != nil {
Expand Down

0 comments on commit a98426a

Please sign in to comment.