Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filters created once #7213

Merged
merged 11 commits into from
Sep 22, 2023
77 changes: 52 additions & 25 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Handler struct {
triggerLister eventinglisters.TriggerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
}

// NewHandler creates a new Handler and its associated EventReceiver.
Expand All @@ -81,32 +82,40 @@ func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
})

fm := subscriptionsapi.NewFiltersMap()

triggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
trigger, ok := obj.(eventingv1.Trigger)
trigger, ok := obj.(*eventingv1.Trigger)
if !ok {
return
}
logger.Debug("Adding filter to filtersMap")
fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger))
kncloudevents.AddOrUpdateAddressableHandler(duckv1.Addressable{
URL: trigger.Status.SubscriberURI,
CACerts: trigger.Status.SubscriberCACerts,
})
},
UpdateFunc: func(_, obj interface{}) {
trigger, ok := obj.(eventingv1.Trigger)
trigger, ok := obj.(*eventingv1.Trigger)
if !ok {
return
}
logger.Debug("Updating filter in filtersMap")
fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger))
kncloudevents.AddOrUpdateAddressableHandler(duckv1.Addressable{
URL: trigger.Status.SubscriberURI,
CACerts: trigger.Status.SubscriberCACerts,
})
},
DeleteFunc: func(obj interface{}) {
trigger, ok := obj.(eventingv1.Trigger)
trigger, ok := obj.(*eventingv1.Trigger)
if !ok {
return
}
logger.Debug("Deleting filter in filtersMap")
fm.Delete(trigger)
kncloudevents.DeleteAddressableHandler(duckv1.Addressable{
URL: trigger.Status.SubscriberURI,
CACerts: trigger.Status.SubscriberCACerts,
Expand All @@ -119,6 +128,7 @@ func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter
triggerLister: triggerInformer.Lister(),
logger: logger,
withContext: wc,
filtersMap: fm,
}, nil
}

Expand Down Expand Up @@ -206,7 +216,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

// Check if the event should be sent.
ctx = logging.WithLogger(ctx, h.logger.Sugar().With(zap.String("trigger", fmt.Sprintf("%s/%s", t.GetNamespace(), t.GetName()))))
filterResult := filterEvent(ctx, t.Spec, *event)
filterResult := h.filterEvent(ctx, t, *event)

if filterResult == eventfilter.FailFilter {
// We do not count the event. The event will be counted in the broker ingress.
Expand Down Expand Up @@ -353,71 +363,88 @@ func (h *Handler) getTrigger(ref path.NamespacedNameUID) (*eventingv1.Trigger, e
return t, nil
}

func filterEvent(ctx context.Context, triggerSpec eventingv1.TriggerSpec, event cloudevents.Event) eventfilter.FilterResult {
func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult {
switch {
case feature.FromContext(ctx).IsEnabled(feature.NewTriggerFilters) && len(triggerSpec.Filters) > 0:
logging.FromContext(ctx).Debugw("New trigger filters feature is enabled. Applying new filters.", zap.Any("filters", triggerSpec.Filters))
return applySubscriptionsAPIFilters(ctx, triggerSpec.Filters, event)
case triggerSpec.Filter != nil:
logging.FromContext(ctx).Debugw("Applying attributes filter.", zap.Any("filter", triggerSpec.Filter))
return applyAttributesFilter(ctx, triggerSpec.Filter, event)
case feature.FromContext(ctx).IsEnabled(feature.NewTriggerFilters):
logging.FromContext(ctx).Debugw("New trigger filters feature is enabled. Applying new filters.", zap.Any("filters", trigger.Spec.Filters))
filter, ok := h.filtersMap.Get(trigger)
if !ok {
// trigger filters haven't update in the map yet - need to create them on the fly
if len(trigger.Spec.Filters) == 0 {
logging.FromContext(ctx).Debug("Found no filters for trigger", zap.String("triggerFilterKey", fmt.Sprintf("%s.%s", trigger.Namespace, trigger.Name)))
return eventfilter.NoFilter
}
return applySubscriptionsAPIFilters(ctx, trigger, event)
}
return filter.Filter(ctx, event)
case trigger.Spec.Filter != nil:
logging.FromContext(ctx).Debugw("Applying attributes filter.", zap.Any("filter", trigger.Spec.Filter))
return applyAttributesFilter(ctx, trigger.Spec.Filter, event)
default:
logging.FromContext(ctx).Debugw("Found no filters in trigger", zap.Any("triggerSpec", triggerSpec))
logging.FromContext(ctx).Debugw("Found no filters in trigger", zap.Any("triggerSpec", trigger.Spec.Filter))
return eventfilter.NoFilter
}
}

func applySubscriptionsAPIFilters(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter, event cloudevents.Event) eventfilter.FilterResult {
return subscriptionsapi.NewAllFilter(materializeFiltersList(ctx, filters)...).Filter(ctx, event)
func applySubscriptionsAPIFilters(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult {
return createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger).Filter(ctx, event)
}

func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigger) eventfilter.Filter {
if len(trigger.Spec.Filters) == 0 {
logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec))
return subscriptionsapi.NewNoFilter()
}
return subscriptionsapi.NewAllFilter(materializeFiltersList(logger, trigger.Spec.Filters)...)
}

func materializeSubscriptionsAPIFilter(ctx context.Context, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
var materializedFilter eventfilter.Filter
var err error
switch {
case len(filter.Exact) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewExactFilter(filter.Exact)
if err != nil {
logging.FromContext(ctx).Debugw("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err))
logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Prefix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewPrefixFilter(filter.Prefix)
if err != nil {
logging.FromContext(ctx).Debugw("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err))
logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Suffix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewSuffixFilter(filter.Suffix)
if err != nil {
logging.FromContext(ctx).Debugw("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err))
logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.All) > 0:
materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(ctx, filter.All)...)
materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(ctx, filter.Any)...)
materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(ctx, *filter.Not))
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
if materializedFilter, err = subscriptionsapi.NewCESQLFilter(filter.CESQL); err != nil {
// This is weird, CESQL expression should be validated when Trigger's are created.
logging.FromContext(ctx).Debugw("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL))
logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL))
return nil
}
}
return materializedFilter
}

func materializeFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
func materializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := materializeSubscriptionsAPIFilter(ctx, f)
f := materializeSubscriptionsAPIFilter(logger, f)
if f == nil {
logging.FromContext(ctx).Warnw("Failed to parse filter. Skipping filter.", zap.Any("filter", f))
logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f))
continue
}
materializedFilters = append(materializedFilters, f)
Expand Down
10 changes: 8 additions & 2 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ import (
"go.uber.org/zap/zaptest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
reconcilertesting "knative.dev/pkg/reconciler/testing"

triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
Expand Down Expand Up @@ -594,6 +595,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
s := httptest.NewServer(&fh)
defer s.Close()

filtersMap := subscriptionsapi.NewFiltersMap()

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
Expand All @@ -605,6 +608,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
trig.Status.SubscriberURI = url
}
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)
filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig))
}
reporter := &mockReporter{}
r, err := NewHandler(
Expand All @@ -620,6 +624,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
t.Fatal("Unable to create receiver:", err)
}

r.filtersMap = filtersMap

e := tc.event
if e == nil {
e = makeEvent()
Expand Down
2 changes: 2 additions & 0 deletions pkg/eventfilter/attributes/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (attrs attributesFilter) Filter(ctx context.Context, event cloudevents.Even
return eventfilter.PassFilter
}

func (attrs attributesFilter) Cleanup() {}

func LookupAttribute(event cloudevents.Event, attr string) (interface{}, bool) {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
Expand Down
2 changes: 2 additions & 0 deletions pkg/eventfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ func (x FilterResult) Or(y FilterResult) FilterResult {
type Filter interface {
// Filter compute the predicate on the provided event and returns the result of the matching
Filter(ctx context.Context, event cloudevents.Event) FilterResult
// Cleanup cleans up any resources/goroutines used by the filter
Cleanup()
}
2 changes: 2 additions & 0 deletions pkg/eventfilter/subscriptionsapi/all_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ func (filter allFilter) Filter(ctx context.Context, event cloudevents.Event) eve
return res
}

func (filter allFilter) Cleanup() {}

var _ eventfilter.Filter = allFilter{}
4 changes: 4 additions & 0 deletions pkg/eventfilter/subscriptionsapi/all_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ func (*passFilter) Filter(ctx context.Context, event cloudevents.Event) eventfil
return eventfilter.PassFilter
}

func (*passFilter) Cleanup() {}

type failFilter struct{}

func (*failFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult {
return eventfilter.FailFilter
}

func (*failFilter) Cleanup() {}

func TestAllFilter_Flat(t *testing.T) {
tests := map[string]struct {
filter eventfilter.Filter
Expand Down
2 changes: 2 additions & 0 deletions pkg/eventfilter/subscriptionsapi/any_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ func (filter anyFilter) Filter(ctx context.Context, event cloudevents.Event) eve
return res
}

func (filter anyFilter) Cleanup() {}

var _ eventfilter.Filter = &anyFilter{}
2 changes: 2 additions & 0 deletions pkg/eventfilter/subscriptionsapi/cesql_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ func (filter *ceSQLFilter) Filter(ctx context.Context, event cloudevents.Event)
}
return eventfilter.PassFilter
}

func (filter *ceSQLFilter) Cleanup() {}
2 changes: 2 additions & 0 deletions pkg/eventfilter/subscriptionsapi/exact_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ func (filter *exactFilter) Filter(ctx context.Context, event cloudevents.Event)
logger.Debugw("Performing an exact match ", zap.Any("filters", filter.filters), zap.Any("event", event))
return filter.attrsFilter.Filter(ctx, event)
}

func (filter *exactFilter) Cleanup() {}
67 changes: 67 additions & 0 deletions pkg/eventfilter/subscriptionsapi/filters_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2023 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subscriptionsapi

import (
"fmt"
"sync"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/eventfilter"
)

type FiltersMap struct {
filtersMap map[string]eventfilter.Filter
rwMutex sync.RWMutex
}

func NewFiltersMap() *FiltersMap {
return &FiltersMap{
filtersMap: make(map[string]eventfilter.Filter),
}
}
func (fm *FiltersMap) Set(trigger *eventingv1.Trigger, filter eventfilter.Filter) {
key := keyFromTrigger(trigger)
fm.rwMutex.Lock()
defer fm.rwMutex.Unlock()
if filter, found := fm.filtersMap[key]; found {
filter.Cleanup()
}
fm.filtersMap[key] = filter
}

func (fm *FiltersMap) Get(trigger *eventingv1.Trigger) (eventfilter.Filter, bool) {
key := keyFromTrigger(trigger)
fm.rwMutex.RLock()
defer fm.rwMutex.RUnlock()
res, found := fm.filtersMap[key]
return res, found
}

func (fm *FiltersMap) Delete(trigger *eventingv1.Trigger) {
key := keyFromTrigger(trigger)
fm.rwMutex.Lock()
defer fm.rwMutex.Unlock()
if filter, found := fm.filtersMap[key]; found {
filter.Cleanup()
}
delete(fm.filtersMap, key)
}

func keyFromTrigger(trigger *eventingv1.Trigger) string {
return fmt.Sprintf("%s.%s", trigger.Namespace, trigger.Name)
}
Loading