Skip to content

Commit

Permalink
fix: middleware refactor and more tests (#44)
Browse files Browse the repository at this point in the history
* refactor HandleFunc into a bunch of middlewares
* add tests for Authenticate and LoadEvent middleware
  • Loading branch information
hairmare authored Apr 21, 2022
1 parent 254012d commit 2ffda2b
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 121 deletions.
4 changes: 2 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package app
import (
"errors"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/adfinis-sygroup/mopsos/app/models"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
Expand All @@ -27,7 +27,7 @@ func NewApp(c *Config, db *gorm.DB) (*App, error) {

func (a *App) Run() {
// eventChan is used to asynchronously pass events receiver from the Server to the Handler
eventChan := make(chan cloudevents.Event)
eventChan := make(chan models.EventData)

// handle events in background goroutine
go func() {
Expand Down
30 changes: 8 additions & 22 deletions app/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package app
import (
"context"

otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand All @@ -26,36 +24,24 @@ func NewHandler(enableTracing bool, db *gorm.DB) *Handler {
}

// HandleEvents blocks on the queue and handles events
func (h *Handler) HandleEvents(eventChan chan cloudevents.Event) error {
func (h *Handler) HandleEvents(eventChan chan models.EventData) error {
// block on the event channel while ranging over its contents
for event := range eventChan {
err := h.HandleEvent(event)
for data := range eventChan {
err := h.HandleEvent(data)
if err != nil {
logrus.WithField("event", event).WithError(err).Error("failed to handle event")
logrus.WithField("event", data.Event).WithError(err).Error("failed to handle event")
}
}
return nil
}

func (h *Handler) HandleEvent(event cloudevents.Event) error {
log := logrus.WithField("event", event)
func (h *Handler) HandleEvent(data models.EventData) error {
log := logrus.WithField("event", data.Event)
log.Debug("received event")

ctx := context.Background()

if h.enableTracing {
ctx = otelObs.ExtractDistributedTracingExtension(ctx, event)
}

record := &models.Record{}

err := event.DataAs(record)
if err != nil {
log.WithError(err).Errorf("failed to unmarshal event data")
return err
}

log.WithField("record", record).Debug("creating record")
log.WithField("record", data.Record).Debug("creating record")

h.database.WithContext(ctx).Clauses(
clause.OnConflict{
Expand All @@ -67,7 +53,7 @@ func (h *Handler) HandleEvent(event cloudevents.Event) error {
},
UpdateAll: true,
},
).Create(record)
).Create(&data.Record)

return nil
}
17 changes: 10 additions & 7 deletions app/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/adfinis-sygroup/mopsos/app/models"
)

func eventStub(record *models.Record) cloudevents.Event {
func eventStub(record *models.Record) models.EventData {
ctx := context.Background()

evt := cloudevents.NewEvent(cloudevents.VersionV1)
Expand Down Expand Up @@ -47,12 +47,15 @@ func eventStub(record *models.Record) cloudevents.Event {
fmt.Printf("%+v\n", ctx)
fmt.Printf("%+v\n", evt)

return evt
return models.EventData{
Event: evt,
Record: *record,
}
}

func Test_Handler_HandleEvent(t *testing.T) {
type args struct {
event cloudevents.Event
eventData models.EventData
}
tests := []struct {
name string
Expand All @@ -62,7 +65,7 @@ func Test_Handler_HandleEvent(t *testing.T) {
{
name: "simple event with minimal data",
args: args{
event: eventStub(
eventData: eventStub(
&models.Record{
ClusterName: "test",
ApplicationName: "test",
Expand All @@ -74,7 +77,7 @@ func Test_Handler_HandleEvent(t *testing.T) {
{
name: "event with complete data",
args: args{
event: eventStub(
eventData: eventStub(
&models.Record{
ClusterName: "cluster-name",
InstanceId: "cluster-instance",
Expand All @@ -88,7 +91,7 @@ func Test_Handler_HandleEvent(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
evt := tt.args.event
evt := tt.args.eventData.Event
evtRecord := &models.Record{}
err := evt.DataAs(evtRecord)
if err != nil {
Expand All @@ -107,7 +110,7 @@ func Test_Handler_HandleEvent(t *testing.T) {

h := mopsos.NewHandler(true, gdb)

if err := h.HandleEvent(evt); (err != nil) != tt.wantErr {
if err := h.HandleEvent(tt.args.eventData); (err != nil) != tt.wantErr {
t.Errorf("Handler.HandleEvent() error = %v, wantErr %v", err, tt.wantErr)
}

Expand Down
9 changes: 9 additions & 0 deletions app/models/eventData.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package models

import cloudevents "github.com/cloudevents/sdk-go/v2"

// EventData is the data structure for passing events between the server and the handler
type EventData struct {
Event cloudevents.Event
Record Record
}
138 changes: 84 additions & 54 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (

"github.com/adfinis-sygroup/mopsos/app/models"
otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/event"
httproto "github.com/cloudevents/sdk-go/v2/protocol/http"
http_logrus "github.com/improbable-eng/go-httpwares/logging/logrus"
"github.com/sirupsen/logrus"
Expand All @@ -20,9 +19,15 @@ import (
type Server struct {
config *Config

EventChan chan<- cloudevents.Event
EventChan chan<- models.EventData
}

type eventContext string

var ContextUsername eventContext = "mopsos.username"
var ContextEvent eventContext = "mopsos.event"
var ContextRecord eventContext = "mopsos.record"

// NewServer creates a server that receives CloudEvents from the network
func NewServer(cfg *Config) *Server {
return &Server{
Expand All @@ -33,91 +38,116 @@ func NewServer(cfg *Config) *Server {
// Start starts the server and listens for incoming events
func (s *Server) Start() {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
// an example API handler
err := json.NewEncoder(w).Encode(map[string]bool{"ok": true})
if err != nil {
logrus.WithError(err).Error("error encoding response")
}
})
mux.Handle("/webhook", otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
mux.HandleFunc("/health", s.HandleHealthCheck)
mux.Handle("/webhook", otelhttp.NewHandler(
s.Authenticate(
s.LoadEvent(
s.Validate(
http.HandlerFunc(s.HandleWebhook),
),
),
),
"webhook-receiver"),
)

logrus.WithField("listener", s.config.HttpListener).Info("Starting server")
loggingMiddleware := http_logrus.Middleware(
logrus.WithFields(logrus.Fields{}),
)(mux)
logrus.Fatal(http.ListenAndServe(s.config.HttpListener, loggingMiddleware))
}

// WithEventChannel sets the event channel for the server
func (s *Server) WithEventChannel(eventChan chan<- models.EventData) *Server {
s.EventChan = eventChan
return s
}

// Authenticate middleware handles checking credentials
func (s *Server) Authenticate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// get basic auth credentials
username, password, ok := r.BasicAuth()
if !ok {
http.Error(w, "missing Authorization header", http.StatusUnauthorized)
return
}
if !s.checkAuth(username, password) {

logrus.WithFields(logrus.Fields{
"username": username,
}).Debug("checking credentials")
if s.config.BasicAuthUsers[username] != password {
http.Error(w, "invalid credentials", http.StatusUnauthorized)
return
}
ctx := context.WithValue(r.Context(), ContextUsername, username)

next.ServeHTTP(w, r.WithContext(ctx))
})
}

// LoadEvent middlerware loads event from the request
func (s *Server) LoadEvent(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// get event
message := httproto.NewMessageFromHttpRequest(r)
event, err := binding.ToEvent(context.TODO(), message)
event, err := binding.ToEvent(r.Context(), message)
if err != nil {
logrus.WithError(err).Error("failed to decode event")
return
}
if s.config.EnableTracing {
// inject the span context into the event so it can be use i.e. while inserting to the database
otelObs.InjectDistributedTracingExtension(r.Context(), *event)
}
logrus.Debugf("received event: %v", event)

// TODO consider how to harmonise this with what the handler does later on
ctx := context.WithValue(r.Context(), ContextEvent, event)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

// Validate middleware handles checking received events for validity
func (s *Server) Validate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
event := r.Context().Value(ContextEvent).(*event.Event)
record := &models.Record{}

if err := event.DataAs(record); err != nil {
logrus.WithError(err).Errorf("failed to unmarshal event data")
http.Error(w, "failed to unmarshal event data", http.StatusInternalServerError)
return
}

// reject record that have not been sent from the right auth
if record.ClusterName != username {
if record.ClusterName != r.Context().Value(ContextUsername).(string) {
http.Error(w, "event data does not match username", http.StatusUnauthorized)
return
}

err = s.HandleReceivedEvent(ctx, *event)
if err != nil {
logrus.WithError(err).Error("failed to handle event")
return
}
// return 202 accepted once the event is on the queue
w.WriteHeader(http.StatusAccepted)
}), "webhook-receiver"))

logrus.WithField("listener", s.config.HttpListener).Info("Starting server")
loggingMiddleware := http_logrus.Middleware(
logrus.WithFields(logrus.Fields{}),
)(mux)
logrus.Fatal(http.ListenAndServe(s.config.HttpListener, loggingMiddleware))
ctx := context.WithValue(r.Context(), ContextRecord, record)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

// WithEventChannel sets the event channel for the server
func (s *Server) WithEventChannel(eventChan chan<- cloudevents.Event) *Server {
s.EventChan = eventChan
return s
func (s *Server) HandleHealthCheck(w http.ResponseWriter, r *http.Request) {
// an example API handler
err := json.NewEncoder(w).Encode(map[string]bool{"ok": true})
if err != nil {
logrus.WithError(err).Error("error encoding response")
}
}

// HandleReceivedEvent is the handler for the cloudevents receiver, public for testing
func (s *Server) HandleReceivedEvent(ctx context.Context, event cloudevents.Event) protocol.Result {

if s.config.EnableTracing {
// inject the span context into the event so it can be use i.e. while inserting to the database
otelObs.InjectDistributedTracingExtension(ctx, event)
}
func (s *Server) HandleWebhook(w http.ResponseWriter, r *http.Request) {
// get middleware data from context
event := r.Context().Value(ContextEvent).(*event.Event)
record := r.Context().Value(ContextRecord).(*models.Record)

// send the event to the main app via the async channel
s.EventChan <- event

logrus.Debugf("received event: %v", event)

return nil
}

// checkAuth checks if the username and password are correct
func (s *Server) checkAuth(username, password string) bool {
logrus.WithFields(logrus.Fields{
"username": username,
}).Debug("checking credentials")
return s.config.BasicAuthUsers[username] == password
s.EventChan <- models.EventData{
Event: *event,
Record: *record,
}
// return 202 accepted once the event is on the queue
w.WriteHeader(http.StatusAccepted)
}
Loading

0 comments on commit 2ffda2b

Please sign in to comment.