Skip to content

Commit

Permalink
Merge pull request #45 from weaveworks/28_events
Browse files Browse the repository at this point in the history
feat: event bus implementation
  • Loading branch information
richardcase authored Aug 10, 2021
2 parents a5f6420 + ba6d67a commit e0ca430
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 0 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ run:
allow-parallel-runners: true
skip-dirs:
- pkg/microvm/mock
- pkg/events/mock

issues:
max-same-issues: 0
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/go-openapi/strfmt v0.19.5 // indirect
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 // indirect
github.com/gruntwork-io/terratest v0.36.3
Expand All @@ -24,6 +25,7 @@ require (
github.com/spf13/cobra v1.2.1 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.8.1 // indirect
github.com/vmware/transport-go v1.1.0 // indirect
google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.0.0 // indirect
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/firecracker-microvm/firecracker-go-sdk v0.22.0 h1:hk28AO5ArAX9iHomi6axNLK+6+8gz1wi3ooNsUTlSFQ=
github.com/firecracker-microvm/firecracker-go-sdk v0.22.0/go.mod h1:lr7w/zmzIi72h+dDMQsRmmKS63EKvnFPEpg2KrjX2X0=
Expand Down Expand Up @@ -463,6 +464,8 @@ github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZp
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stomp/stomp v2.0.3+incompatible h1:B8gYzgV3rXQRHoK8itI9RZDyimEBAwKbJjfoRKpdxwc=
github.com/go-stomp/stomp v2.0.3+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
Expand Down Expand Up @@ -589,6 +592,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
Expand All @@ -602,8 +607,12 @@ github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
Expand Down Expand Up @@ -739,10 +748,14 @@ github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kN
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo=
Expand Down Expand Up @@ -1018,6 +1031,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3C
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/vmware/transport-go v1.1.0 h1:68jUI4ZuTvoOuCc3ylvpWFT5HjYrVhqJTW79itV/pNE=
github.com/vmware/transport-go v1.1.0/go.mod h1:yVx33Ih199+jJbOTQRXuyNKm6Hu9nUH9nlM1gbj0buQ=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down Expand Up @@ -1269,6 +1284,7 @@ golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200120151820-655fe14d7479/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
13 changes: 13 additions & 0 deletions pkg/events/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package events

import "fmt"

// ErrTopicNotFound is an error created when a topic with a specific name isn't found.
type ErrTopicNotFound struct {
Name string
}

// Error returns the error message.
func (e ErrTopicNotFound) Error() string {
return fmt.Sprintf("topic %s not found", e.Name)
}
41 changes: 41 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package events

import (
"context"

"github.com/google/uuid"
)

// Handler represents an event handling function.
type Handler func(e *Envelope)

// ErrorHandler represents an error handling function.
type ErrorHandler func(err error)

// Handlers represents a pair of event/error handlers.
type Handlers struct {
// Event is the event handler function.
Event Handler
// Error is the error handler function.
Error ErrorHandler
}

// EventBus is the interface that an event bus must implement.
type EventBus interface {
// CreateTopic will create a named topic (a.k.a channel or queue) for events.
CreateTopic(ctx context.Context, topic string) error
// Publish will publish an event to a specific topic.
Publish(ctx context.Context, topic string, event interface{}) error
// Subscribe will subscribe to events on a named topic and will call the relevant handlers.
Subscribe(ctx context.Context, topic string, handlers Handlers) error
}

// Envelope represents an event envelope.
type Envelope struct {
// ID is the unique identifier for the event.
ID uuid.UUID `json:"id"`
// Topic is the name of the topic the event originated from.
Topic string `json:"topic"`
// Event is the actual event payload.
Event interface{} `json:"event"`
}
3 changes: 3 additions & 0 deletions pkg/events/mock/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package mock

//go:generate ../../../hack/tools/bin/mockgen -destination mock.go -package mock github.com/weaveworks/reignite/pkg/events EventBus
78 changes: 78 additions & 0 deletions pkg/events/mock/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/events/transport/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package transport

import "errors"

var (
errHandlerRequired = errors.New("event handler is required")
errErrorHandlerRequired = errors.New("error handler is required")
errTopicRequired = errors.New("topic name is required")
)
101 changes: 101 additions & 0 deletions pkg/events/transport/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package transport

import (
"context"
"fmt"

"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"

"github.com/weaveworks/reignite/pkg/events"
)

// New creates a new event bus based on Transport (https://vmware.github.io/transport/).
func New() events.EventBus {
return &transportEvents{
eventBus: bus.GetBus(),
}
}

type transportEvents struct {
eventBus bus.EventBus
}

// CreateTopic will create a named topic (a.k.a channel or queue) for events.
func (te *transportEvents) CreateTopic(ctx context.Context, topic string) error {
if topic == "" {
return errTopicRequired
}

manager := te.eventBus.GetChannelManager()

if !manager.CheckChannelExists(topic) {
manager.CreateChannel(topic)
}

return nil
}

// Publish will publish an event to a specific topic.
func (te *transportEvents) Publish(ctx context.Context, topic string, event interface{}) error {
if topic == "" {
return errTopicRequired
}

manager := te.eventBus.GetChannelManager()

if !manager.CheckChannelExists(topic) {
return events.ErrTopicNotFound{Name: topic}
}

if err := te.eventBus.SendRequestMessage(topic, event, nil); err != nil {
return fmt.Errorf("sending message to channel: %w", err)
}

return nil
}

// Subscribe will subscribe to events on a named topic and will call the relevant handler.
func (te *transportEvents) Subscribe(ctx context.Context, topic string, handlers events.Handlers) error {
if handlers.Event == nil {
return errHandlerRequired
}
if handlers.Error == nil {
return errErrorHandlerRequired
}
if topic == "" {
return errTopicRequired
}

manager := te.eventBus.GetChannelManager()

if !manager.CheckChannelExists(topic) {
return events.ErrTopicNotFound{Name: topic}
}

h, err := te.eventBus.ListenRequestStream(topic)
if err != nil {
return fmt.Errorf("listening for transport events: %w", err)
}
h.Handle(te.subsciberHandler(handlers.Event), te.subsciberErrorHandler(handlers.Error))

return nil
}

func (te *transportEvents) subsciberHandler(handler events.Handler) bus.MessageHandlerFunction {
return func(msg *model.Message) {
evt := &events.Envelope{
ID: *msg.Id,
Topic: msg.Channel,
Event: msg.Payload,
}

handler(evt)
}
}

func (te *transportEvents) subsciberErrorHandler(errHandler events.ErrorHandler) bus.MessageErrorFunction {
return func(err error) {
errHandler(err)
}
}
Loading

0 comments on commit e0ca430

Please sign in to comment.