Skip to content

Commit

Permalink
Include authorization in the event
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Winnicki committed Jun 7, 2018
1 parent b01bbf9 commit 918ac12
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 133 deletions.
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New(eventType TypeName, mimeType string, payload interface{}) *Event {
Data: payload,
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": TransformationVersion,
},
},
Expand Down Expand Up @@ -136,7 +136,7 @@ func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("contentType", e.ContentType)
}
if e.Extensions != nil {
e.Extensions.MarshalLogObject(enc)
enc.AddObject("extensions", e.Extensions)
}

payload, _ := json.Marshal(e.Data)
Expand Down
10 changes: 5 additions & 5 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var newTests = []struct {
Data: []byte("test"),
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": eventpkg.TransformationVersion,
},
},
Expand All @@ -99,7 +99,7 @@ var newTests = []struct {
Data: eventpkg.SystemEventReceivedData{},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{
"transformed": true,
"transformed": "true",
"transformation-version": eventpkg.TransformationVersion,
},
},
Expand Down Expand Up @@ -223,7 +223,7 @@ var fromRequestTests = []struct {
ContentType: "application/octet-stream",
Data: []byte("hey there"),
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
{
Expand Down Expand Up @@ -264,7 +264,7 @@ var fromRequestTests = []struct {
ContentType: "application/json",
Data: map[string]interface{}{"eventType": "user.created"},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
{
Expand All @@ -286,7 +286,7 @@ var fromRequestTests = []struct {
Body: []byte("hey there"),
},
Extensions: map[string]interface{}{
"eventgateway": map[string]interface{}{"transformed": true, "transformation-version": "0.1"}},
"eventgateway": map[string]interface{}{"transformed": "true", "transformation-version": "0.1"}},
},
},
}
3 changes: 2 additions & 1 deletion internal/zap/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zap

import (
"encoding/json"

"go.uber.org/zap/zapcore"
)

Expand All @@ -23,7 +24,7 @@ type MapStringInterface map[string]interface{}
func (msi MapStringInterface) MarshalLogObject(enc zapcore.ObjectEncoder) error {
for key, val := range msi {
v, err := json.Marshal(val)
if err != nil {
if err == nil {
enc.AddString(key, string(v))
} else {
return err
Expand Down
39 changes: 26 additions & 13 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/jinzhu/copier"
"github.com/rs/cors"
"go.uber.org/zap"

Expand Down Expand Up @@ -87,10 +88,10 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {

syncSubscriber := router.targetCache.SyncSubscriber(r.Method, path, event.EventType)
if syncSubscriber != nil { // There is sync subscriber and possibly async subscribers also
router.handleSyncSubscription(path, event, *syncSubscriber, w, r)
router.handleSyncSubscription(path, *event, *syncSubscriber, w, r)
}

router.handleAsyncSubscriptions(r.Method, path, event, r)
router.handleAsyncSubscriptions(r.Method, path, *event, r)
if syncSubscriber == nil {
w.WriteHeader(http.StatusAccepted)
}
Expand Down Expand Up @@ -148,19 +149,19 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleSyncSubscription(path string, event *eventpkg.Event, subscriber SyncSubscriber, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleSyncSubscription(path string, event eventpkg.Event, subscriber SyncSubscriber, w http.ResponseWriter, r *http.Request) {
// metrics & logs
metricEventsReceived.WithLabelValues(subscriber.Space, string(event.EventType)).Inc()
router.log.Debug("Event received.", zap.String("path", path), zap.String("space", subscriber.Space), zap.Object("event", event))
err := router.emitSystemEventReceived(path, *event, r.Header)
err := router.emitSystemEventReceived(path, event, r.Header)
if err != nil {
router.log.Debug("Event processing stopped because sync plugin subscription returned an error.",
zap.Object("event", event),
zap.Error(err))
return
}

err = router.authorizeEventType(subscriber.Space, event, r)
err = router.authorizeEventType(subscriber.Space, &event, r)
if err != nil {
w.WriteHeader(http.StatusForbidden)
return
Expand All @@ -172,7 +173,7 @@ func (router *Router) handleSyncSubscription(path string, event *eventpkg.Event,
httpRequestData.Params = subscriber.Params
event.Data = httpRequestData
}
router.httpRequestHandler(subscriber.Space, subscriber.FunctionID, event)(w, r)
router.httpRequestHandler(subscriber.Space, subscriber.FunctionID, &event)(w, r)

metricEventsProcessed.WithLabelValues(subscriber.Space, string(event.EventType)).Inc()
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (router *Router) httpRequestHandler(space string, backingFunction function.
}

// handleAsyncSubscriptions fetched events subscribers, runs authorization and enqueues event in the queue
func (router *Router) handleAsyncSubscriptions(method, path string, event *eventpkg.Event, r *http.Request) {
func (router *Router) handleAsyncSubscriptions(method, path string, event eventpkg.Event, r *http.Request) {
if event.IsSystem() {
router.log.Debug("System event received.", zap.Object("event", event))
}
Expand All @@ -228,9 +229,11 @@ func (router *Router) handleAsyncSubscriptions(method, path string, event *event
for _, subscriber := range subscribers {
metricEventsReceived.WithLabelValues(subscriber.Space, "custom").Inc()

err := router.authorizeEventType(subscriber.Space, event, r)
subEvent := eventpkg.Event{}
copier.Copy(&subEvent, &event)
err := router.authorizeEventType(subscriber.Space, &subEvent, r)
if err == nil {
router.enqueueWork(method, path, subscriber.Space, subscriber.FunctionID, *event)
router.enqueueWork(method, path, subscriber.Space, subscriber.FunctionID, subEvent)
}
}
}
Expand Down Expand Up @@ -331,6 +334,16 @@ func (router *Router) authorizeEventType(space string, event *eventpkg.Event, r
zap.Object("event", event))
return errors.New(authorizerResponse.AuthorizationError.Message)
}

if egExternsions, ok := event.Extensions["eventgateway"]; ok {
egExternsions.(map[string]interface{})["authorization"] = authorizerResponse.Authorization
} else {
event.Extensions = map[string]interface{}{
"eventgateway": map[string]interface{}{
"authorization": authorizerResponse.Authorization,
},
}
}
}

return nil
Expand Down Expand Up @@ -425,7 +438,7 @@ func (router *Router) emitSystemEventReceived(path string, event eventpkg.Event,
mimeJSON,
eventpkg.SystemEventReceivedData{Path: path, Event: event, Headers: ihttp.FlattenHeader(header)},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)
return router.plugins.React(system)
}

Expand All @@ -435,7 +448,7 @@ func (router *Router) emitSystemFunctionInvoking(space string, functionID functi
mimeJSON,
eventpkg.SystemFunctionInvokingData{Space: space, FunctionID: functionID, Event: event},
)
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokingType)).Inc()

Expand All @@ -447,7 +460,7 @@ func (router *Router) emitSystemFunctionInvoked(space string, functionID functio
eventpkg.SystemFunctionInvokedType,
mimeJSON,
eventpkg.SystemFunctionInvokedData{Space: space, FunctionID: functionID, Event: event, Result: result})
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokedType)).Inc()

Expand All @@ -460,7 +473,7 @@ func (router *Router) emitSystemFunctionInvocationFailed(space string, functionI
eventpkg.SystemFunctionInvocationFailedType,
mimeJSON,
eventpkg.SystemFunctionInvocationFailedData{Space: space, FunctionID: functionID, Event: event, Error: err})
router.handleAsyncSubscriptions(http.MethodPost, "/", system, nil)
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil)

metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvocationFailedType)).Inc()
}
Expand Down
Loading

0 comments on commit 918ac12

Please sign in to comment.