Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding signal filters #54

Merged
merged 1 commit into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
| nats-io/gnatsd | Apache-2.0 |
| Shopify/sarama | MIT |
| stretchr/testify | https://github.com/stretchr/testify/blob/master/LICENSE |
| tidwall/gjson | MIT |
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Argo Events is an open source event-based dependency manager for Kubernetes. The
Argo Events is a Kubernetes CRD which can manage dependencies using kubectl commands.
- [Learn about signals](./docs/signal-guide.md)
- [Learn about triggers](./docs/trigger-guide.md)
- [Review Sensor API](./docs/sensor-api.md)
- [Getting started](./docs/quickstart.md)
- [Want to contribute?](./CONTRIBUTING.md)
- See where the project is headed in the [roadmap](./ROADMAP.md)
4 changes: 2 additions & 2 deletions controller/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM alpine:3.7
FROM scratch
COPY dist/sensor-controller /
CMD [ "/sensor-controller" ]
CMD [ "/sensor-controller" ]
155 changes: 155 additions & 0 deletions controller/signal-filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package controller

import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/ghodss/yaml"
"github.com/tidwall/gjson"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// apply the signal filters to an event
func filterEvent(f v1alpha1.SignalFilter, event *v1alpha1.Event) (bool, error) {
dataRes, err := filterData(f.Data, event)
return filterTime(f.Time, &event.Context.EventTime) && filterContext(f.Context, &event.Context) && dataRes, err
}

// applyTimeFilter checks the eventTime against the timeFilter:
// 1. the eventTime is greater than or equal to the start time
// 2. the eventTime is less than the end time
// returns true if 1 and 2 are true and false otherwise
func filterTime(timeFilter *v1alpha1.TimeFilter, eventTime *metav1.Time) bool {
if timeFilter != nil && eventTime != nil {
return (timeFilter.Start.Before(eventTime) || timeFilter.Start.Equal(eventTime)) && eventTime.Before(&timeFilter.Stop)
}
return true
}

// applyContextFilter checks the expected EventContext against the actual EventContext
// values are only enforced if they are non-zero values
// map types check that the expected map is a subset of the actual map
func filterContext(expected *v1alpha1.EventContext, actual *v1alpha1.EventContext) bool {
res := true
if expected.EventType != "" {
res = res && expected.EventType == actual.EventType
}
if expected.EventTypeVersion != "" {
res = res && expected.EventTypeVersion == actual.EventTypeVersion
}
if expected.CloudEventsVersion != "" {
res = res && expected.CloudEventsVersion == actual.CloudEventsVersion
}
if expected.Source != nil {
res = res && reflect.DeepEqual(expected.Source, actual.Source)
}
if expected.SchemaURL != nil {
res = res && reflect.DeepEqual(expected.SchemaURL, actual.SchemaURL)
}
if expected.ContentType != "" {
res = res && expected.ContentType == actual.ContentType
}
eExtensionRes := mapIsSubset(expected.Extensions, actual.Extensions)
return res && eExtensionRes
}

// various supported media types
// TODO: add support for XML
const (
MediaTypeJSON string = "application/json"
//MediaTypeXML string = "application/xml"
MediaTypeYAML string = "application/yaml"
)

// applyDataFilter runs the dataFilter against the event's data
// returns (true, nil) when data passes filters, false otherwise
// TODO: split this function up into smaller pieces
func filterData(dataFilters []*v1alpha1.DataFilter, event *v1alpha1.Event) (bool, error) {
// TODO: use the event.Context.SchemaURL to figure out correct data format to unmarshal to
// for now, let's just use a simple map[string]interface{} for arbitrary data
if event == nil {
return false, fmt.Errorf("nil event")
}
if event.Data == nil || len(event.Data) == 0 {
return true, nil
}
raw := event.Data
var data map[string]interface{}
// contentType is formatted as: '{type}; charset="xxx"'
contents := strings.Split(event.Context.ContentType, ";")
if len(contents) < 1 {
return false, fmt.Errorf("event context ContentType not found: %s", contents)
}
switch contents[0] {
case MediaTypeJSON:
if err := json.Unmarshal(raw, &data); err != nil {
return false, err
}
/*
case MediaTypeXML:
if err := xml.Unmarshal(raw, &data); err != nil {
return false, err
}
*/
case MediaTypeYAML:
if err := yaml.Unmarshal(raw, &data); err != nil {
return false, err
}
default:
return false, fmt.Errorf("unsupported event content type: %s", event.Context.ContentType)
}
// now let's marshal the data back into json in order to do gjson processing
json, err := json.Marshal(data)
if err != nil {
return false, err
}
for _, f := range dataFilters {
res := gjson.Get(string(json), f.Path)
if !res.Exists() {
return false, nil
}
switch f.Type {
case v1alpha1.JSONTypeBool:
val, err := strconv.ParseBool(f.Value)
if err != nil {
return false, err
}
if val != res.Bool() {
return false, nil
}
case v1alpha1.JSONTypeNumber:
val, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
return false, err
}
if val != res.Float() {
return false, nil
}
case v1alpha1.JSONTypeString:
if f.Value != res.Str {
return false, nil
}
default:
return false, fmt.Errorf("unsupported JSON type %s", f.Type)
}
}
return true, nil
}

// checks that m contains the k,v pairs of sub
func mapIsSubset(sub map[string]string, m map[string]string) bool {
for k, v := range sub {
val, ok := m[k]
if !ok {
return false
}
if v != val {
return false
}
}
return true
}
Loading