Gorabbit is a wrapper that provides high level and robust RabbitMQ operations through a client or a manager.
This wrapper depends on the official Go RabbitMQ plugin.
go get github.com/KardinalAI/gorabbit
The client's and manager's Mode
can also be set via an environment variable that will override the manually
entered value.
GORABBIT_MODE: debug # possible values: release or debug
The client and manager can also be completely disabled via the following environment variable:
GORABBIT_DISABLED: true # possible values: true, false, 1, or 0
Here is a visual representation of the always-on mechanism of a connection and channel when the KeepAlive
flag is set
to true.
The gorabbit client offers 2 main functionalities:
- Publishing
- Consuming
Additionally, the client also provides a ready check and a health check.
A client can be initialized via the constructor NewClient
. This constructor takes ClientOptions
as an optional
parameter.
Property | Description | Default Value |
---|---|---|
Host | The hostname of the RabbitMQ server | 127.0.0.1 |
Port | The port of the RabbitMQ server | 5672 |
Username | The plain authentication username | guest |
Password | The plain authentication password | guest |
Vhost | The specific vhost to use when connection to CloudAMQP | |
UseTLS | The flag that activates the use of TLS (amqps) | false |
ConnectionName | The desired connection name | Gorabbit |
KeepAlive | The flag that activates retry and re-connect mechanisms | true |
RetryDelay | The delay between each retry and re-connection | 3 seconds |
MaxRetry | The max number of message retry if it failed to process | 5 |
PublishingCacheTTL | The time to live for a failed publish when set in cache | 60 seconds |
PublishingCacheSize | The max number of failed publish to add into cache | 128 |
Mode | The mode defines whether logs are shown or not | Release |
Marshaller | The content type used for messages and how they're marshalled | JSON |
Passing nil
options will trigger the client to use default values (host, port, credentials, etc...)
via DefaultClientOptions()
.
client := gorabbit.NewClient(nil)
You can also explicitly pass DefaultClientOptions()
for a cleaner initialization.
client := gorabbit.NewClient(gorabbit.DefaultClientOptions())
Finally, passing a NewClientOptions()
method also initializes default values if not overwritten.
client := gorabbit.NewClient(gorabbit.NewClientOptions())
You can instantiate a client from environment variables, without the need of manually specifying options in the code.
client := gorabbit.NewClientFromEnv()
Here are the following supported environment variables:
RABBITMQ_HOST
: Defines the host,RABBITMQ_PORT
: Defines the port,RABBITMQ_USERNAME
: Defines the username,RABBITMQ_PASSWORD
: Defines the password,RABBITMQ_VHOST
: Defines the vhost,RABBITMQ_USE_TLS
: Defines whether to use TLS or no.RABBITMQ_CONNECTION_NAME
: Defines the desired connection name.
Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.
We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.
NewClientOptions()
and DefaultClientOptions()
both return an instance of *ClientOptions
that can act as a builder.
options := gorabbit.NewClientOptions().
SetMode(gorabbit.Debug).
SetCredentials("root", "password").
SetRetryDelay(5 * time.Second)
client := gorabbit.NewClient(options)
ℹ️ There is a setter method for each property.
ClientOptions
is an exported type, so it can be used directly.
options := gorabbit.ClientOptions {
Host: "localhost",
Port: 5673,
Username: "root",
Password: "password",
...
}
client := gorabbit.NewClient(&options)
⚠️ Direct initialization via the struct does not use default values on missing properties, so be sure to fill in every property available.
When a client is initialized, to prevent a leak, always disconnect it when no longer needed.
client := gorabbit.NewClient(gorabbit.DefaultClientOptions())
defer client.Disconnect()
To send a message, the client offers two simple methods: Publish
and PublishWithOptions
. The required arguments for
publishing are:
- Exchange (which exchange the message should be sent to)
- Routing Key
- Payload (
interface{}
, the object will be marshalled internally)
Example of sending a simple string
err := client.Publish("events_exchange", "event.foo.bar.created", "foo string")
Example of sending an object
type foo struct {
Action string
}
err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"})
Optionally, you can set the message's Priority
, DeliveryMode
and Expiration
via the PublishWithOptions
method.
options := gorabbit.SendOptions().
SetPriority(gorabbit.PriorityMedium).
SetDeliveryMode(gorabbit.Persistent).
SetTTL(5*time.Second)
err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options)
ℹ️ If the
KeepAlive
flag is set to true when initializing the client, failed publishing will be cached once and re-published as soon as the channel is back up.
To consume messages, gorabbit offers a very simple asynchronous consumer method Consume
that takes a MessageConsumer
as argument. Error handling, acknowledgement, negative acknowledgement and rejection are all done internally by the
consumer.
err := client.RegisterConsumer(gorabbit.MessageConsumer{
Queue: "events_queue",
Name: "toto_consumer",
PrefetchSize: 0,
PrefetchCount: 10,
AutoAck: false,
ConcurrentProcess: false,
Handlers: gorabbit.MQTTMessageHandlers{
"event.foo.bar.created": func (payload []byte) error {
fmt.Println(string(payload))
return nil
},
},
})
- Queue: The queue to consume messages from
- Name: Unique identifier for the consumer
- PrefetchSize: The maximum size of messages that can be processed at the same time
- PrefetchCount: The maximum number of messages that can be processed at the same time
- AutoAck: Automatic acknowledgement of messages upon reception
- ConcurrentProcess: Asynchronous handling of deliveries
- Handlers: A list of handlers for specified routes
NB: RabbitMQ Wildcards are also supported.
If multiple routing keys have the same handler, a wildcard can be used, for example:
event.foo.bar.*
or event.foo.#
.
ℹ️ If the
KeepAlive
flag is set to true when initializing the client, consumers will auto-reconnect after a connection loss. This mechanism is indefinite and therefore, consuming from a non-existent queue will trigger an error repeatedly but will not affect other consumptions. This is because each consumer has its own channel.
The client offers IsReady()
and IsHealthy()
checks that can be used for monitoring.
Ready: Verifies that connections are opened and ready to launch new operations.
Healthy: Verifies that both connections and channels are opened, ready and ongoing operations are working (Consumers are consuming).
The gorabbit manager offers multiple management operations:
- Exchange, queue and bindings creation
- Exchange and queue deletion
- Queue evaluation: Exists, number of messages
- Queue operations: Pop message, push message, purge
⚠️ A manager should only be used for either testing RabbitMQ functionalities or setting up a RabbitMQ server. The manager does not provide robust mechanisms of retry and reconnection like the client.
A manager can be initialized via the constructor NewManager
. This constructor takes ManagerOptions
as an optional
parameter.
Property | Description | Default Value |
---|---|---|
Host | The hostname of the RabbitMQ server | 127.0.0.1 |
Port | The port of the RabbitMQ server | 5672 |
Username | The plain authentication username | guest |
Password | The plain authentication password | guest |
Vhost | The specific vhost to use when connection to CloudAMQP | |
UseTLS | The flag that activates the use of TLS (amqps) | false |
Mode | The mode defines whether logs are shown or not | Release |
Marshaller | The content type used for messages and how they're marshalled | JSON |
Passing nil
options will trigger the manager to use default values (host, port, credentials, etc...)
via DefaultManagerOptions()
.
manager := gorabbit.NewManager(nil)
You can also explicitly pass DefaultManagerOptions()
for a cleaner initialization.
manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
Finally, passing a NewManagerOptions()
method also initializes default values if not overwritten.
manager := gorabbit.NewManager(gorabbit.NewManagerOptions())
You can instantiate a manager from environment variables, without the need of manually specifying options in the code.
manager := gorabbit.NewManagerFromEnv()
Here are the following supported environment variables:
RABBITMQ_HOST
: Defines the host,RABBITMQ_PORT
: Defines the port,RABBITMQ_USERNAME
: Defines the username,RABBITMQ_PASSWORD
: Defines the password,RABBITMQ_VHOST
: Defines the vhost,RABBITMQ_USE_TLS
: Defines whether to use TLS or no.
Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.
We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.
NewManagerOptions()
and DefaultManagerOptions()
both return an instance of *ManagerOptions
that can act as a
builder.
options := gorabbit.NewManagerOptions().
SetMode(gorabbit.Debug).
SetCredentials("root", "password")
manager := gorabbit.NewManager(options)
ℹ️ There is a setter method for each property.
ManagerOptions
is an exported type, so it can be used directly.
options := gorabbit.ManagerOptions {
Host: "localhost",
Port: 5673,
Username: "root",
Password: "password",
Mode: gorabbit.Debug,
}
manager := gorabbit.NewManager(&options)
⚠️ Direct initialization via the struct does not use default values on missing properties, so be sure to fill in every property available.
When a manager is initialized, to prevent a leak, always disconnect it when no longer needed.
manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
defer manager.Disconnect()
The manager offers all necessary operations to manager a RabbitMQ server.
Creates an exchange with optional arguments.
err := manager.CreateExchange(gorabbit.ExchangeConfig{
Name: "events_exchange",
Type: gorabbit.ExchangeTypeTopic,
Persisted: false,
Args: nil,
})
Creates a queue with optional arguments and bindings if declared.
err := manager.CreateQueue(gorabbit.QueueConfig{
Name: "events_queue",
Durable: false,
Exclusive: false,
Args: nil,
Bindings: &[]gorabbit.BindingConfig{
{
RoutingKey: "event.foo.bar.created",
Exchange: "events_exchange",
},
},
})
Binds a queue to an exchange via a given routing key.
err := manager.BindExchangeToQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created")
Returns the number of messages in a queue, or an error if the queue does not exist. This method can also evaluate the existence of a queue.
messageCount, err := manager.GetNumberOfMessages("events_queue")
Pushes a single message to a given exchange.
err := manager.PushMessageToExchange("events_exchange", "event.foo.bar.created", "single_message_payload")
Retrieves a single message from a given queue and auto acknowledges it if autoAck
is set to true.
message, err := manager.PopMessageFromQueue("events_queue", true)
Deletes all messages from a given queue.
err := manager.PurgeQueue("events_queue")
Deletes a given queue.
err := manager.DeleteQueue("events_queue")
Deletes a given exchange.
err := manager.DeleteExchange("events_exchange")
You can setup exchanges, queues and bindings automatically by referencing a RabbitMQ Schema Definition JSON file.
err := manager.SetupFromDefinitions("/path/to/definitions.json")
⚠️ The standard RabbitMQ definitions file contains configurations forusers
,vhosts
andpermissions
. Those configurations are not taken into consideration in theSetupFromDefinitions
method.
To run a local rabbitMQ server quickly with a docker container, simply run the following command:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
It will launch a local RabbitMQ server mapped on port 5672, and the management dashboard will be mapped on port 15672 accessible on localhost:15672 with a username "guest" and password "guest".
Gorabbit is licensed under the MIT.