From 0930b48d58967a4390a9adaffbd6ba2624a16112 Mon Sep 17 00:00:00 2001 From: Danil Grigorev Date: Thu, 13 Jul 2023 17:54:31 +0200 Subject: [PATCH] Expose selector for default cache pre-filtering --- util/predicates/cluster_predicates.go | 4 +- util/predicates/expression_predicates.go | 153 +++++++---------------- util/predicates/generic_predicates.go | 57 +++------ 3 files changed, 60 insertions(+), 154 deletions(-) diff --git a/util/predicates/cluster_predicates.go b/util/predicates/cluster_predicates.go index 1fdfa678f56e..55756b7b4f78 100644 --- a/util/predicates/cluster_predicates.go +++ b/util/predicates/cluster_predicates.go @@ -161,7 +161,7 @@ func ClusterUpdateUnpaused(logger logr.Logger) predicate.Funcs { // handler.EnqueueRequestsFromMapFunc(clusterToMachines) // predicates.ClusterUnpaused(r.Log), // ) -func ClusterUnpaused(logger logr.Logger) predicate.Funcs { +func ClusterUnpaused(logger logr.Logger) predicate.Predicate { log := logger.WithValues("predicate", "ClusterUnpaused") // Use any to ensure we process either create or update events we care about @@ -218,7 +218,7 @@ func ClusterControlPlaneInitialized(logger logr.Logger) predicate.Funcs { // handler.EnqueueRequestsFromMapFunc(clusterToMachines) // predicates.ClusterUnpausedAndInfrastructureReady(r.Log), // ) -func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Funcs { +func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Predicate { log := logger.WithValues("predicate", "ClusterUnpausedAndInfrastructureReady") // Only continue processing create events if both not paused and infrastructure is ready diff --git a/util/predicates/expression_predicates.go b/util/predicates/expression_predicates.go index 7f1f1f5d363b..4541e98f8424 100644 --- a/util/predicates/expression_predicates.go +++ b/util/predicates/expression_predicates.go @@ -20,138 +20,73 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/google/cel-go/cel" - "github.com/google/cel-go/checker/decls" - "github.com/google/cel-go/common/types" - "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/predicate" -) -var expressionMatcher *ExpressionMatcher + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) -// ExpressionMatcher holds initialized CEL program for evaluation of incoming objects on events -// and filtering which objects to reconcile. -type ExpressionMatcher struct { - program cel.Program +// LabelMatcher holds initialized Label Selector predicate for evaluation of incoming +// objects on events and filtering which objects to reconcile. +type LabelMatcher struct { + selector labels.Selector + predicate predicate.Predicate } -// InitExpressionMatcher initializes expression which will apply on all processed objects in events in every controller. -func InitExpressionMatcher(log logr.Logger, expression string) error { - if expression == "" { - return nil +// ComposeFilterExpression will return a valid label selector string representation +// by converting watchFilter to a default for watchExpression. +func ComposeFilterExpression(watchExpression, watchFilter string) string { + if watchExpression == "" && watchFilter != "" { + return fmt.Sprintf("%s = %s", clusterv1.WatchLabel, watchFilter) } - program, err := getProgram(log, expression) - if err != nil { - log.Error(err, fmt.Sprintf("Unable to compile CEL program for %s", expression)) - return err - } - log.Info("Initialized expression predicate for the given rule: ", expression) - - expressionMatcher = &ExpressionMatcher{program} - return nil + return watchExpression } -// GetExpressionMatcher returns existing ExpressionMatcher. -func GetExpressionMatcher() *ExpressionMatcher { - return expressionMatcher -} - -func (m *ExpressionMatcher) matches(log logr.Logger, o client.Object) bool { - matches, err := m.matchesExpression(log, o) - if err != nil { - log.V(6).Info(fmt.Sprintf("Rejecting object %+v due to errror: %s", o, err)) - return false - } - if !matches { - log.V(6).Info(fmt.Sprintf("Rejecting object - does not match expression %+v", o)) +// InitLabelMatcher initializes expression which will apply on all processed objects in events in every controller. +func InitLabelMatcher(log logr.Logger, labelExpression string) (LabelMatcher, error) { + matcher := LabelMatcher{} + if labelExpression == "" { + return matcher, nil } - return matches -} - -func getProgram(log logr.Logger, expression string) (cel.Program, error) { - declarations := cel.Declarations( - decls.NewVar("self", decls.NewMapType(decls.String, decls.Dyn)), - ) - - env, err := cel.NewEnv(declarations) + expr, err := metav1.ParseToLabelSelector(labelExpression) if err != nil { - return nil, errors.Wrap(err, "Unable to create CEL environment") + log.Error(err, fmt.Sprintf("Unable to compile LabelSelector from %s", labelExpression)) + return matcher, err } + log.Info("Initialized label selector predicate for the given rule: ", labelExpression) - ast, issues := env.Compile(expression) - if issues != nil && issues.Err() != nil { - log.Error(issues.Err(), "Unable to compile provided expression %s", expression) - return nil, issues.Err() - } - if ast.OutputType() != cel.BoolType { - return nil, errors.New("Expression should evaluate to boolean value") - } - - prg, err := env.Program(ast) - if err != nil { - log.Error(err, "Unable to create CEL instance") - return nil, err - } - - return prg, nil -} - -func (m *ExpressionMatcher) matchesExpression(log logr.Logger, o client.Object) (bool, error) { - if o == nil { - return false, nil - } - - unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o) + labelPredicate, err := predicate.LabelSelectorPredicate(*expr) if err != nil { - log.Error(err, "Unable to convert object to unstructured") - return false, err + log.Error(err, fmt.Sprintf("Unable to compile label selector for %s", labelExpression)) + return matcher, err } - result, details, err := m.program.Eval(map[string]interface{}{ - "self": unstructured, - }) + selector, err := metav1.LabelSelectorAsSelector(expr) if err != nil { - log.Error(err, fmt.Sprintf("Can't eval object with provided expression: %v", details)) - return false, err + log.Error(err, "Unable to parse label selector for cache filtering for %s", labelExpression) + return matcher, err } - return result == types.True, nil + matcher.selector = selector + matcher.predicate = labelPredicate + return matcher, nil } -// PassThrough returns a predicate accepting anything. -func PassThrough() predicate.Funcs { - return predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { return true }, - DeleteFunc: func(e event.DeleteEvent) bool { return true }, - GenericFunc: func(e event.GenericEvent) bool { return true }, - } +// Selector returns compiled label selector if it was initialized from expression +func (m *LabelMatcher) Selector() labels.Selector { + return m.selector } -// Matches returns a predicate that returns true only if the provided -// CEL expression matches on filtered resource. -func (m *ExpressionMatcher) Matches(log logr.Logger) predicate.Funcs { - if m == nil { - log.Info("Skipping filter apply, no expression was set") - return PassThrough() +// Matches returns a predicate that accepts objects only matching +// watch-filter or watch-expression value. +func (m *LabelMatcher) Matches(log logr.Logger) predicate.Predicate { + if m.predicate == nil { + log.Info("Skipping filter apply, no label expression was set") + return predicate.Funcs{} } - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "update"), e.ObjectNew) - }, - CreateFunc: func(e event.CreateEvent) bool { - return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "create"), e.Object) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "delete"), e.Object) - }, - GenericFunc: func(e event.GenericEvent) bool { - return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "generic"), e.Object) - }, - } + return m.predicate } diff --git a/util/predicates/generic_predicates.go b/util/predicates/generic_predicates.go index ccce5de6d1da..11310002c872 100644 --- a/util/predicates/generic_predicates.go +++ b/util/predicates/generic_predicates.go @@ -29,12 +29,12 @@ import ( ) // All returns a predicate that returns true only if all given predicates return true. -func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { +func All(logger logr.Logger, predicates ...predicate.Predicate) predicate.Predicate { return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { log := logger.WithValues("predicateAggregation", "All") for _, p := range predicates { - if !p.UpdateFunc(e) { + if !p.Update(e) { log.V(6).Info("One of the provided predicates returned false, blocking further processing") return false } @@ -45,7 +45,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { CreateFunc: func(e event.CreateEvent) bool { log := logger.WithValues("predicateAggregation", "All") for _, p := range predicates { - if !p.CreateFunc(e) { + if !p.Create(e) { log.V(6).Info("One of the provided predicates returned false, blocking further processing") return false } @@ -56,7 +56,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { DeleteFunc: func(e event.DeleteEvent) bool { log := logger.WithValues("predicateAggregation", "All") for _, p := range predicates { - if !p.DeleteFunc(e) { + if !p.Delete(e) { log.V(6).Info("One of the provided predicates returned false, blocking further processing") return false } @@ -67,7 +67,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { GenericFunc: func(e event.GenericEvent) bool { log := logger.WithValues("predicateAggregation", "All") for _, p := range predicates { - if !p.GenericFunc(e) { + if !p.Generic(e) { log.V(6).Info("One of the provided predicates returned false, blocking further processing") return false } @@ -79,12 +79,12 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { } // Any returns a predicate that returns true only if any given predicate returns true. -func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { +func Any(logger logr.Logger, predicates ...predicate.Predicate) predicate.Predicate { return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { log := logger.WithValues("predicateAggregation", "Any") for _, p := range predicates { - if p.UpdateFunc(e) { + if p.Update(e) { log.V(6).Info("One of the provided predicates returned true, allowing further processing") return true } @@ -95,7 +95,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { CreateFunc: func(e event.CreateEvent) bool { log := logger.WithValues("predicateAggregation", "Any") for _, p := range predicates { - if p.CreateFunc(e) { + if p.Create(e) { log.V(6).Info("One of the provided predicates returned true, allowing further processing") return true } @@ -106,7 +106,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { DeleteFunc: func(e event.DeleteEvent) bool { log := logger.WithValues("predicateAggregation", "Any") for _, p := range predicates { - if p.DeleteFunc(e) { + if p.Delete(e) { log.V(6).Info("One of the provided predicates returned true, allowing further processing") return true } @@ -117,7 +117,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { GenericFunc: func(e event.GenericEvent) bool { log := logger.WithValues("predicateAggregation", "Any") for _, p := range predicates { - if p.GenericFunc(e) { + if p.Generic(e) { log.V(6).Info("One of the provided predicates returned true, allowing further processing") return true } @@ -130,21 +130,8 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { // ResourceHasFilterLabel returns a predicate that returns true only if the provided resource contains // a label with the WatchLabel key and the configured label value exactly. -func ResourceHasFilterLabel(logger logr.Logger, labelValue string) predicate.Funcs { - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "update"), e.ObjectNew, labelValue) - }, - CreateFunc: func(e event.CreateEvent) bool { - return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "create"), e.Object, labelValue) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "delete"), e.Object, labelValue) - }, - GenericFunc: func(e event.GenericEvent) bool { - return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "generic"), e.Object, labelValue) - }, - } +func ResourceHasFilterLabel(logger logr.Logger, labelSelector LabelMatcher) predicate.Predicate { + return labelSelector.Matches(logger) } // ResourceNotPaused returns a Predicate that returns true only if the provided resource does not contain the @@ -180,8 +167,8 @@ func ResourceNotPaused(logger logr.Logger) predicate.Funcs { // ResourceNotPausedAndHasFilterLabel returns a predicate that returns true only if the // ResourceNotPaused and ResourceHasFilterLabel predicates return true. -func ResourceNotPausedAndHasFilterLabel(logger logr.Logger, labelValue string) predicate.Funcs { - return All(logger, ResourceNotPaused(logger), ResourceHasFilterLabel(logger, labelValue)) +func ResourceNotPausedAndHasFilterLabel(logger logr.Logger, labelSelector LabelMatcher) predicate.Predicate { + return All(logger, ResourceNotPaused(logger), labelSelector.Matches(logger)) } func processIfNotPaused(logger logr.Logger, obj client.Object) bool { @@ -195,22 +182,6 @@ func processIfNotPaused(logger logr.Logger, obj client.Object) bool { return true } -func processIfLabelMatch(logger logr.Logger, obj client.Object, labelValue string) bool { - // Return early if no labelValue was set. - if labelValue == "" { - return true - } - - kind := strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) - log := logger.WithValues("namespace", obj.GetNamespace(), kind, obj.GetName()) - if labels.HasWatchLabel(obj, labelValue) { - log.V(6).Info("Resource matches label, will attempt to map resource") - return true - } - log.V(4).Info("Resource does not match label, will not attempt to map resource") - return false -} - // ResourceIsNotExternallyManaged returns a predicate that returns true only if the resource does not contain // the externally managed annotation. // This implements a requirement for InfraCluster providers to be able to ignore externally managed