Skip to content

Commit

Permalink
Expose selector for default cache pre-filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Danil-Grigorev committed Jul 14, 2023
1 parent 09bc5c5 commit 0930b48
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 154 deletions.
4 changes: 2 additions & 2 deletions util/predicates/cluster_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func ClusterUpdateUnpaused(logger logr.Logger) predicate.Funcs {
// handler.EnqueueRequestsFromMapFunc(clusterToMachines)
// predicates.ClusterUnpaused(r.Log),
// )
func ClusterUnpaused(logger logr.Logger) predicate.Funcs {
func ClusterUnpaused(logger logr.Logger) predicate.Predicate {
log := logger.WithValues("predicate", "ClusterUnpaused")

// Use any to ensure we process either create or update events we care about
Expand Down Expand Up @@ -218,7 +218,7 @@ func ClusterControlPlaneInitialized(logger logr.Logger) predicate.Funcs {
// handler.EnqueueRequestsFromMapFunc(clusterToMachines)
// predicates.ClusterUnpausedAndInfrastructureReady(r.Log),
// )
func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Funcs {
func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Predicate {
log := logger.WithValues("predicate", "ClusterUnpausedAndInfrastructureReady")

// Only continue processing create events if both not paused and infrastructure is ready
Expand Down
153 changes: 44 additions & 109 deletions util/predicates/expression_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,138 +20,73 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"github.com/google/cel-go/common/types"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var expressionMatcher *ExpressionMatcher
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

// ExpressionMatcher holds initialized CEL program for evaluation of incoming objects on events
// and filtering which objects to reconcile.
type ExpressionMatcher struct {
program cel.Program
// LabelMatcher holds initialized Label Selector predicate for evaluation of incoming
// objects on events and filtering which objects to reconcile.
type LabelMatcher struct {
selector labels.Selector
predicate predicate.Predicate
}

// InitExpressionMatcher initializes expression which will apply on all processed objects in events in every controller.
func InitExpressionMatcher(log logr.Logger, expression string) error {
if expression == "" {
return nil
// ComposeFilterExpression will return a valid label selector string representation
// by converting watchFilter to a default for watchExpression.
func ComposeFilterExpression(watchExpression, watchFilter string) string {
if watchExpression == "" && watchFilter != "" {
return fmt.Sprintf("%s = %s", clusterv1.WatchLabel, watchFilter)
}
program, err := getProgram(log, expression)
if err != nil {
log.Error(err, fmt.Sprintf("Unable to compile CEL program for %s", expression))
return err
}
log.Info("Initialized expression predicate for the given rule: ", expression)

expressionMatcher = &ExpressionMatcher{program}
return nil
return watchExpression
}

// GetExpressionMatcher returns existing ExpressionMatcher.
func GetExpressionMatcher() *ExpressionMatcher {
return expressionMatcher
}

func (m *ExpressionMatcher) matches(log logr.Logger, o client.Object) bool {
matches, err := m.matchesExpression(log, o)
if err != nil {
log.V(6).Info(fmt.Sprintf("Rejecting object %+v due to errror: %s", o, err))
return false
}
if !matches {
log.V(6).Info(fmt.Sprintf("Rejecting object - does not match expression %+v", o))
// InitLabelMatcher initializes expression which will apply on all processed objects in events in every controller.
func InitLabelMatcher(log logr.Logger, labelExpression string) (LabelMatcher, error) {
matcher := LabelMatcher{}
if labelExpression == "" {
return matcher, nil
}

return matches
}

func getProgram(log logr.Logger, expression string) (cel.Program, error) {
declarations := cel.Declarations(
decls.NewVar("self", decls.NewMapType(decls.String, decls.Dyn)),
)

env, err := cel.NewEnv(declarations)
expr, err := metav1.ParseToLabelSelector(labelExpression)
if err != nil {
return nil, errors.Wrap(err, "Unable to create CEL environment")
log.Error(err, fmt.Sprintf("Unable to compile LabelSelector from %s", labelExpression))
return matcher, err
}
log.Info("Initialized label selector predicate for the given rule: ", labelExpression)

ast, issues := env.Compile(expression)
if issues != nil && issues.Err() != nil {
log.Error(issues.Err(), "Unable to compile provided expression %s", expression)
return nil, issues.Err()
}
if ast.OutputType() != cel.BoolType {
return nil, errors.New("Expression should evaluate to boolean value")
}

prg, err := env.Program(ast)
if err != nil {
log.Error(err, "Unable to create CEL instance")
return nil, err
}

return prg, nil
}

func (m *ExpressionMatcher) matchesExpression(log logr.Logger, o client.Object) (bool, error) {
if o == nil {
return false, nil
}

unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
labelPredicate, err := predicate.LabelSelectorPredicate(*expr)
if err != nil {
log.Error(err, "Unable to convert object to unstructured")
return false, err
log.Error(err, fmt.Sprintf("Unable to compile label selector for %s", labelExpression))
return matcher, err
}

result, details, err := m.program.Eval(map[string]interface{}{
"self": unstructured,
})
selector, err := metav1.LabelSelectorAsSelector(expr)
if err != nil {
log.Error(err, fmt.Sprintf("Can't eval object with provided expression: %v", details))
return false, err
log.Error(err, "Unable to parse label selector for cache filtering for %s", labelExpression)
return matcher, err
}

return result == types.True, nil
matcher.selector = selector
matcher.predicate = labelPredicate
return matcher, nil
}

// PassThrough returns a predicate accepting anything.
func PassThrough() predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
GenericFunc: func(e event.GenericEvent) bool { return true },
}
// Selector returns compiled label selector if it was initialized from expression
func (m *LabelMatcher) Selector() labels.Selector {
return m.selector
}

// Matches returns a predicate that returns true only if the provided
// CEL expression matches on filtered resource.
func (m *ExpressionMatcher) Matches(log logr.Logger) predicate.Funcs {
if m == nil {
log.Info("Skipping filter apply, no expression was set")
return PassThrough()
// Matches returns a predicate that accepts objects only matching
// watch-filter or watch-expression value.
func (m *LabelMatcher) Matches(log logr.Logger) predicate.Predicate {
if m.predicate == nil {
log.Info("Skipping filter apply, no label expression was set")
return predicate.Funcs{}
}

return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "update"), e.ObjectNew)
},
CreateFunc: func(e event.CreateEvent) bool {
return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "create"), e.Object)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "delete"), e.Object)
},
GenericFunc: func(e event.GenericEvent) bool {
return m.matches(log.WithValues("predicate", "ResourceMatchesExpression", "eventType", "generic"), e.Object)
},
}
return m.predicate
}
57 changes: 14 additions & 43 deletions util/predicates/generic_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
)

// All returns a predicate that returns true only if all given predicates return true.
func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
func All(logger logr.Logger, predicates ...predicate.Predicate) predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicateAggregation", "All")
for _, p := range predicates {
if !p.UpdateFunc(e) {
if !p.Update(e) {
log.V(6).Info("One of the provided predicates returned false, blocking further processing")
return false
}
Expand All @@ -45,7 +45,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
CreateFunc: func(e event.CreateEvent) bool {
log := logger.WithValues("predicateAggregation", "All")
for _, p := range predicates {
if !p.CreateFunc(e) {
if !p.Create(e) {
log.V(6).Info("One of the provided predicates returned false, blocking further processing")
return false
}
Expand All @@ -56,7 +56,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
DeleteFunc: func(e event.DeleteEvent) bool {
log := logger.WithValues("predicateAggregation", "All")
for _, p := range predicates {
if !p.DeleteFunc(e) {
if !p.Delete(e) {
log.V(6).Info("One of the provided predicates returned false, blocking further processing")
return false
}
Expand All @@ -67,7 +67,7 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
GenericFunc: func(e event.GenericEvent) bool {
log := logger.WithValues("predicateAggregation", "All")
for _, p := range predicates {
if !p.GenericFunc(e) {
if !p.Generic(e) {
log.V(6).Info("One of the provided predicates returned false, blocking further processing")
return false
}
Expand All @@ -79,12 +79,12 @@ func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
}

// Any returns a predicate that returns true only if any given predicate returns true.
func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
func Any(logger logr.Logger, predicates ...predicate.Predicate) predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicateAggregation", "Any")
for _, p := range predicates {
if p.UpdateFunc(e) {
if p.Update(e) {
log.V(6).Info("One of the provided predicates returned true, allowing further processing")
return true
}
Expand All @@ -95,7 +95,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
CreateFunc: func(e event.CreateEvent) bool {
log := logger.WithValues("predicateAggregation", "Any")
for _, p := range predicates {
if p.CreateFunc(e) {
if p.Create(e) {
log.V(6).Info("One of the provided predicates returned true, allowing further processing")
return true
}
Expand All @@ -106,7 +106,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
DeleteFunc: func(e event.DeleteEvent) bool {
log := logger.WithValues("predicateAggregation", "Any")
for _, p := range predicates {
if p.DeleteFunc(e) {
if p.Delete(e) {
log.V(6).Info("One of the provided predicates returned true, allowing further processing")
return true
}
Expand All @@ -117,7 +117,7 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
GenericFunc: func(e event.GenericEvent) bool {
log := logger.WithValues("predicateAggregation", "Any")
for _, p := range predicates {
if p.GenericFunc(e) {
if p.Generic(e) {
log.V(6).Info("One of the provided predicates returned true, allowing further processing")
return true
}
Expand All @@ -130,21 +130,8 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {

// ResourceHasFilterLabel returns a predicate that returns true only if the provided resource contains
// a label with the WatchLabel key and the configured label value exactly.
func ResourceHasFilterLabel(logger logr.Logger, labelValue string) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "update"), e.ObjectNew, labelValue)
},
CreateFunc: func(e event.CreateEvent) bool {
return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "create"), e.Object, labelValue)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "delete"), e.Object, labelValue)
},
GenericFunc: func(e event.GenericEvent) bool {
return processIfLabelMatch(logger.WithValues("predicate", "ResourceHasFilterLabel", "eventType", "generic"), e.Object, labelValue)
},
}
func ResourceHasFilterLabel(logger logr.Logger, labelSelector LabelMatcher) predicate.Predicate {
return labelSelector.Matches(logger)
}

// ResourceNotPaused returns a Predicate that returns true only if the provided resource does not contain the
Expand Down Expand Up @@ -180,8 +167,8 @@ func ResourceNotPaused(logger logr.Logger) predicate.Funcs {

// ResourceNotPausedAndHasFilterLabel returns a predicate that returns true only if the
// ResourceNotPaused and ResourceHasFilterLabel predicates return true.
func ResourceNotPausedAndHasFilterLabel(logger logr.Logger, labelValue string) predicate.Funcs {
return All(logger, ResourceNotPaused(logger), ResourceHasFilterLabel(logger, labelValue))
func ResourceNotPausedAndHasFilterLabel(logger logr.Logger, labelSelector LabelMatcher) predicate.Predicate {
return All(logger, ResourceNotPaused(logger), labelSelector.Matches(logger))
}

func processIfNotPaused(logger logr.Logger, obj client.Object) bool {
Expand All @@ -195,22 +182,6 @@ func processIfNotPaused(logger logr.Logger, obj client.Object) bool {
return true
}

func processIfLabelMatch(logger logr.Logger, obj client.Object, labelValue string) bool {
// Return early if no labelValue was set.
if labelValue == "" {
return true
}

kind := strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind)
log := logger.WithValues("namespace", obj.GetNamespace(), kind, obj.GetName())
if labels.HasWatchLabel(obj, labelValue) {
log.V(6).Info("Resource matches label, will attempt to map resource")
return true
}
log.V(4).Info("Resource does not match label, will not attempt to map resource")
return false
}

// ResourceIsNotExternallyManaged returns a predicate that returns true only if the resource does not contain
// the externally managed annotation.
// This implements a requirement for InfraCluster providers to be able to ignore externally managed
Expand Down

0 comments on commit 0930b48

Please sign in to comment.