A lightweight transactional message bus on top of RabbitMQ supporting:
- Supported messaging semantics
- One Way
- Duplex
- Publish/Subscribe
- Request/Reply (RPC)
- Long running processes via the Saga pattern
- Retry and backoffs
- Publisher confirms
- Reliable messaging and local service transactivity via Transaction Outbox pattern
- Deadlettering
- Structured logging
Planned:
- Deduplication of inbound messages
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
- MySql > 8.0 (InnoDB)
- gob
- Avro
- Protobuf
- Opentracing
The following outlines the basic usage of grabbit. For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package
import (
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
)
Define a message
type SomeMessage struct {}
func(SomeMessage) SchemaName() string{
return "some.unique.namespace.somemessage"
}
Creating a transactional GBus instance
gb := builder.
New().
Bus("connection string to RabbitMQ").
Txnl("mysql", "connection string to mysql").
WithConfirms().
Build("name of your service")
Register a command handler
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
cmd, ok := message.Payload.(*SomeCommand)
if ok {
fmt.Printf("handler invoked with message %v", cmd)
return nil
}
return fmt.Errorf("failed to handle message")
}
gb.HandleMessage(SomeCommand{}, handler)
Register an event handler
eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
evt, ok := message.Payload.(*SomeEvent)
if ok {
fmt.Printf("handler invoked with event %v", evt)
return nil
}
return fmt.Errorf("failed to handle event")
}
gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)
Start the bus
gb.Start()
defer gb.Shutdown()
Send a command
gb.Send(context.Background(), "name of service you are sending the command to", gbus.NewBusMessage(SomeCommand{}))
Publish an event
gb.Publish(context.Background(), "name of exchange", "name of topic", gbus.NewBusMessage(SomeEvent{}))
RPC style call
request := gbus.NewBusMessage(SomeRPCRequest{})
reply := gbus.NewBusMessage(SomeRPCReply{})
timeOut := 2 * time.Second
reply, e := gb.RPC(context.Background(), "name of service you are sending the request to", request, reply, timeOut)
if e != nil{
fmt.Printf("rpc call failed with error %v", e)
} else{
fmt.Printf("rpc call returned with reply %v", reply)
}
- ensure that you have the dependencies installed:
go get -v -t -d ./...
- make sure to first:
docker-compose up -V -d
- then to run the tests:
go test ./...