diff --git a/.golangci.yml b/.golangci.yml index 41125d71..1bf97d5f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,6 +4,7 @@ run: allow-parallel-runners: true skip-dirs: - pkg/microvm/mock + - pkg/events/mock issues: max-same-issues: 0 diff --git a/go.mod b/go.mod index eb9419e7..4f833c48 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-openapi/strfmt v0.19.5 // indirect github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 // indirect github.com/gruntwork-io/terratest v0.36.3 @@ -24,6 +25,7 @@ require ( github.com/spf13/cobra v1.2.1 // indirect github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.8.1 // indirect + github.com/vmware/transport-go v1.1.0 // indirect google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced // indirect google.golang.org/grpc v1.38.0 // indirect google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8ad2a2d0..8b091486 100644 --- a/go.sum +++ b/go.sum @@ -355,6 +355,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/firecracker-microvm/firecracker-go-sdk v0.22.0 h1:hk28AO5ArAX9iHomi6axNLK+6+8gz1wi3ooNsUTlSFQ= github.com/firecracker-microvm/firecracker-go-sdk v0.22.0/go.mod h1:lr7w/zmzIi72h+dDMQsRmmKS63EKvnFPEpg2KrjX2X0= @@ -463,6 +464,8 @@ github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZp github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-stomp/stomp v2.0.3+incompatible h1:B8gYzgV3rXQRHoK8itI9RZDyimEBAwKbJjfoRKpdxwc= +github.com/go-stomp/stomp v2.0.3+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= @@ -589,6 +592,8 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= @@ -602,8 +607,12 @@ github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1: github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -739,10 +748,14 @@ github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kN github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo= @@ -1018,6 +1031,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3C github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= +github.com/vmware/transport-go v1.1.0 h1:68jUI4ZuTvoOuCc3ylvpWFT5HjYrVhqJTW79itV/pNE= +github.com/vmware/transport-go v1.1.0/go.mod h1:yVx33Ih199+jJbOTQRXuyNKm6Hu9nUH9nlM1gbj0buQ= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= @@ -1269,6 +1284,7 @@ golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200120151820-655fe14d7479/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/events/errors.go b/pkg/events/errors.go new file mode 100644 index 00000000..1a823c14 --- /dev/null +++ b/pkg/events/errors.go @@ -0,0 +1,13 @@ +package events + +import "fmt" + +// ErrTopicNotFound is an error created when a topic with a specific name isn't found. +type ErrTopicNotFound struct { + Name string +} + +// Error returns the error message. +func (e ErrTopicNotFound) Error() string { + return fmt.Sprintf("topic %s not found", e.Name) +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 00000000..751b6843 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,41 @@ +package events + +import ( + "context" + + "github.com/google/uuid" +) + +// Handler represents an event handling function. +type Handler func(e *Envelope) + +// ErrorHandler represents an error handling function. +type ErrorHandler func(err error) + +// Handlers represents a pair of event/error handlers. +type Handlers struct { + // Event is the event handler function. + Event Handler + // Error is the error handler function. + Error ErrorHandler +} + +// EventBus is the interface that an event bus must implement. +type EventBus interface { + // CreateTopic will create a named topic (a.k.a channel or queue) for events. + CreateTopic(ctx context.Context, topic string) error + // Publish will publish an event to a specific topic. + Publish(ctx context.Context, topic string, event interface{}) error + // Subscribe will subscribe to events on a named topic and will call the relevant handlers. + Subscribe(ctx context.Context, topic string, handlers Handlers) error +} + +// Envelope represents an event envelope. +type Envelope struct { + // ID is the unique identifier for the event. + ID uuid.UUID `json:"id"` + // Topic is the name of the topic the event originated from. + Topic string `json:"topic"` + // Event is the actual event payload. + Event interface{} `json:"event"` +} diff --git a/pkg/events/mock/gen.go b/pkg/events/mock/gen.go new file mode 100644 index 00000000..a260987b --- /dev/null +++ b/pkg/events/mock/gen.go @@ -0,0 +1,3 @@ +package mock + +//go:generate ../../../hack/tools/bin/mockgen -destination mock.go -package mock github.com/weaveworks/reignite/pkg/events EventBus diff --git a/pkg/events/mock/mock.go b/pkg/events/mock/mock.go new file mode 100644 index 00000000..ae3c6d2a --- /dev/null +++ b/pkg/events/mock/mock.go @@ -0,0 +1,78 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/weaveworks/reignite/pkg/events (interfaces: EventBus) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + events "github.com/weaveworks/reignite/pkg/events" +) + +// MockEventBus is a mock of EventBus interface. +type MockEventBus struct { + ctrl *gomock.Controller + recorder *MockEventBusMockRecorder +} + +// MockEventBusMockRecorder is the mock recorder for MockEventBus. +type MockEventBusMockRecorder struct { + mock *MockEventBus +} + +// NewMockEventBus creates a new mock instance. +func NewMockEventBus(ctrl *gomock.Controller) *MockEventBus { + mock := &MockEventBus{ctrl: ctrl} + mock.recorder = &MockEventBusMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventBus) EXPECT() *MockEventBusMockRecorder { + return m.recorder +} + +// CreateTopic mocks base method. +func (m *MockEventBus) CreateTopic(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTopic", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateTopic indicates an expected call of CreateTopic. +func (mr *MockEventBusMockRecorder) CreateTopic(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockEventBus)(nil).CreateTopic), arg0, arg1) +} + +// Publish mocks base method. +func (m *MockEventBus) Publish(arg0 context.Context, arg1 string, arg2 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockEventBusMockRecorder) Publish(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockEventBus)(nil).Publish), arg0, arg1, arg2) +} + +// Subscribe mocks base method. +func (m *MockEventBus) Subscribe(arg0 context.Context, arg1 string, arg2 events.Handler, arg3 events.ErrorHandler) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockEventBusMockRecorder) Subscribe(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventBus)(nil).Subscribe), arg0, arg1, arg2, arg3) +} diff --git a/pkg/events/transport/errors.go b/pkg/events/transport/errors.go new file mode 100644 index 00000000..574e9e1c --- /dev/null +++ b/pkg/events/transport/errors.go @@ -0,0 +1,9 @@ +package transport + +import "errors" + +var ( + errHandlerRequired = errors.New("event handler is required") + errErrorHandlerRequired = errors.New("error handler is required") + errTopicRequired = errors.New("topic name is required") +) diff --git a/pkg/events/transport/transport.go b/pkg/events/transport/transport.go new file mode 100644 index 00000000..54448946 --- /dev/null +++ b/pkg/events/transport/transport.go @@ -0,0 +1,101 @@ +package transport + +import ( + "context" + "fmt" + + "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/model" + + "github.com/weaveworks/reignite/pkg/events" +) + +// New creates a new event bus based on Transport (https://vmware.github.io/transport/). +func New() events.EventBus { + return &transportEvents{ + eventBus: bus.GetBus(), + } +} + +type transportEvents struct { + eventBus bus.EventBus +} + +// CreateTopic will create a named topic (a.k.a channel or queue) for events. +func (te *transportEvents) CreateTopic(ctx context.Context, topic string) error { + if topic == "" { + return errTopicRequired + } + + manager := te.eventBus.GetChannelManager() + + if !manager.CheckChannelExists(topic) { + manager.CreateChannel(topic) + } + + return nil +} + +// Publish will publish an event to a specific topic. +func (te *transportEvents) Publish(ctx context.Context, topic string, event interface{}) error { + if topic == "" { + return errTopicRequired + } + + manager := te.eventBus.GetChannelManager() + + if !manager.CheckChannelExists(topic) { + return events.ErrTopicNotFound{Name: topic} + } + + if err := te.eventBus.SendRequestMessage(topic, event, nil); err != nil { + return fmt.Errorf("sending message to channel: %w", err) + } + + return nil +} + +// Subscribe will subscribe to events on a named topic and will call the relevant handler. +func (te *transportEvents) Subscribe(ctx context.Context, topic string, handlers events.Handlers) error { + if handlers.Event == nil { + return errHandlerRequired + } + if handlers.Error == nil { + return errErrorHandlerRequired + } + if topic == "" { + return errTopicRequired + } + + manager := te.eventBus.GetChannelManager() + + if !manager.CheckChannelExists(topic) { + return events.ErrTopicNotFound{Name: topic} + } + + h, err := te.eventBus.ListenRequestStream(topic) + if err != nil { + return fmt.Errorf("listening for transport events: %w", err) + } + h.Handle(te.subsciberHandler(handlers.Event), te.subsciberErrorHandler(handlers.Error)) + + return nil +} + +func (te *transportEvents) subsciberHandler(handler events.Handler) bus.MessageHandlerFunction { + return func(msg *model.Message) { + evt := &events.Envelope{ + ID: *msg.Id, + Topic: msg.Channel, + Event: msg.Payload, + } + + handler(evt) + } +} + +func (te *transportEvents) subsciberErrorHandler(errHandler events.ErrorHandler) bus.MessageErrorFunction { + return func(err error) { + errHandler(err) + } +} diff --git a/pkg/events/transport/transport_test.go b/pkg/events/transport/transport_test.go new file mode 100644 index 00000000..a68ef06a --- /dev/null +++ b/pkg/events/transport/transport_test.go @@ -0,0 +1,222 @@ +package transport_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/weaveworks/reignite/pkg/events" + "github.com/weaveworks/reignite/pkg/events/transport" +) + +func TestTransport_SimplePubSub(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "test" + messageReceived := false + errorReceived := false + + handler := func(e *events.Envelope) { + messageReceived = true + } + errHandler := func(err error) { + errorReceived = true + } + + err := trans.CreateTopic(ctx, topicName) + Expect(err).NotTo(HaveOccurred()) + + err = trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler, + Error: errHandler, + }) + Expect(err).NotTo(HaveOccurred()) + + err = trans.Publish(ctx, topicName, "someevent") + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(1 * time.Second) + + Expect(messageReceived).To(BeTrue()) + Expect(errorReceived).To(BeFalse()) +} + +func TestTransport_MultipleSubscribers(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "test" + sub1MessageReceived := false + sub2MessageReceived := false + errorReceived := false + + handler1 := func(e *events.Envelope) { + sub1MessageReceived = true + } + handler2 := func(e *events.Envelope) { + sub2MessageReceived = true + } + errHandler := func(err error) { + errorReceived = true + } + + err := trans.CreateTopic(ctx, topicName) + Expect(err).NotTo(HaveOccurred()) + + err = trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler1, + Error: errHandler, + }) + Expect(err).NotTo(HaveOccurred()) + + err = trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler2, + Error: errHandler, + }) + Expect(err).NotTo(HaveOccurred()) + + err = trans.Publish(ctx, topicName, "someevent") + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(1 * time.Second) + + Expect(sub1MessageReceived).To(BeTrue()) + Expect(sub2MessageReceived).To(BeTrue()) + Expect(errorReceived).To(BeFalse()) +} + +func TestTransport_SubscribeUnknownTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "doesntexist" + + handler := func(e *events.Envelope) {} + + errHandler := func(err error) {} + + err := trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler, + Error: errHandler, + }) + Expect(err).To(HaveOccurred()) +} + +func TestTransport_SubscribeEmptyTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "" + + handler := func(e *events.Envelope) {} + + errHandler := func(err error) {} + + err := trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler, + Error: errHandler, + }) + Expect(err).To(HaveOccurred()) +} + +func TestTransport_PublishUnknownTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "doesntexist" + + err := trans.Publish(ctx, topicName, "someevent") + Expect(err).To(HaveOccurred()) +} + +func TestTransport_PublishEmptyTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "" + + err := trans.Publish(ctx, topicName, "someevent") + Expect(err).To(HaveOccurred()) +} + +func TestTransport_IdempotentCreateTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "test" + + err := trans.CreateTopic(ctx, topicName) + Expect(err).NotTo(HaveOccurred(), "creating topic first time should succeed") + + err = trans.CreateTopic(ctx, topicName) + Expect(err).NotTo(HaveOccurred(), "creating topic again time should succeed") +} + +func TestTransport_CreateEmptyTopic(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "" + + err := trans.CreateTopic(ctx, topicName) + Expect(err).To(HaveOccurred(), "creating topic with a blank name should fail") +} + +func TestTransport_SubscribeNilHandler(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "test" + + if err := trans.CreateTopic(ctx, topicName); err != nil { + t.Fatal(err) + } + + errHandler := func(err error) {} + + err := trans.Subscribe(ctx, topicName, events.Handlers{ + Error: errHandler, + }) + Expect(err).To(HaveOccurred()) +} + +func TestTransport_SubscribeNilErrorHandler(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + trans := transport.New() + + topicName := "test" + + if err := trans.CreateTopic(ctx, topicName); err != nil { + t.Fatal(err) + } + + handler := func(e *events.Envelope) {} + + err := trans.Subscribe(ctx, topicName, events.Handlers{ + Event: handler, + }) + Expect(err).To(HaveOccurred()) +}