Skip to content

Commit

Permalink
Allow creating multiple connections and select which one to use
Browse files Browse the repository at this point in the history
  • Loading branch information
swantzter committed May 16, 2023
1 parent fad34a9 commit ebfe638
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 57 deletions.
102 changes: 70 additions & 32 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package amqp
import (
"context"
"encoding/json"
"fmt"

amqpDriver "github.com/rabbitmq/amqp091-go"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -14,10 +15,11 @@ const version = "v0.3.0"

// AMQP type holds connection to a remote AMQP server.
type AMQP struct {
Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnId *int
Queue *Queue
Exchange *Exchange
}

// Options defines configuration options for an AMQP session.
Expand All @@ -27,14 +29,15 @@ type Options struct {

// PublishOptions defines a message payload with delivery options.
type PublishOptions struct {
QueueName string
Body string
Headers amqpDriver.Table
Exchange string
ContentType string
Mandatory bool
Immediate bool
Persistent bool
ConnectionId int
QueueName string
Body string
Headers amqpDriver.Table
Exchange string
ContentType string
Mandatory bool
Immediate bool
Persistent bool
}

// ConsumeOptions defines options for use when consuming a message.
Expand All @@ -52,30 +55,51 @@ type ListenerType func(string) error

// ListenOptions defines options for subscribing to message(s) within a queue.
type ListenOptions struct {
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
ConnectionId int
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

const messagepack = "application/x-msgpack"

// Start establishes a session with an AMQP server given the provided options.
func (amqp *AMQP) Start(options Options) error {
func (amqp *AMQP) Start(options Options) (int, error) {
conn, err := amqpDriver.Dial(options.ConnectionURL)
amqp.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
*amqp.MaxConnId += 1
(*amqp.Connections)[*amqp.MaxConnId] = conn
return *amqp.MaxConnId, err
}

// Gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (amqp *AMQP) GetConn(connId int) (*amqpDriver.Connection, error) {
if connId == 0 {
conn := (*amqp.Connections)[*amqp.MaxConnId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised")
}
return conn, nil
} else {
conn := (*amqp.Connections)[connId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId)
}
return conn, nil
}
}

// Publish delivers the payload using options provided.
func (amqp *AMQP) Publish(options PublishOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -119,7 +143,11 @@ func (amqp *AMQP) Publish(options PublishOptions) error {

// Listen binds to an AMQP queue in order to receive message(s) as they are received.
func (amqp *AMQP) Listen(options ListenOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -149,12 +177,22 @@ func (amqp *AMQP) Listen(options ListenOptions) error {
}

func init() {
queue := Queue{}
exchange := Exchange{}
connections := make(map[int]*amqpDriver.Connection)
maxConnId := 0
queue := Queue{
Connections: &connections,
MaxConnId: &maxConnId,
}
exchange := Exchange{
Connections: &connections,
MaxConnId: &maxConnId,
}
generalAMQP := AMQP{
Version: version,
Queue: &queue,
Exchange: &exchange,
Version: version,
Connections: &connections,
MaxConnId: &maxConnId,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAMQP)
Expand Down
71 changes: 57 additions & 14 deletions exchanges.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package amqp

import (
"fmt"
amqpDriver "github.com/rabbitmq/amqp091-go"
)

// Exchange defines a connection to publish/subscribe destinations.
type Exchange struct {
Version string
Connection *amqpDriver.Connection
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnId *int
}

// ExchangeOptions defines configuration settings for accessing an exchange.
Expand All @@ -17,17 +19,24 @@ type ExchangeOptions struct {

// ExchangeDeclareOptions provides options when declaring (creating) an exchange.
type ExchangeDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
ConnectionId int
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

// ExchangeDeleteOptions provides options when deleting an exchange.
type ExchangeDeleteOptions struct {
ConnectionId int
}

// ExchangeBindOptions provides options when binding (subscribing) one exchange to another.
type ExchangeBindOptions struct {
ConnectionId int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
Expand All @@ -37,16 +46,38 @@ type ExchangeBindOptions struct {

// ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.
type ExchangeUnbindOptions struct {
ConnectionId int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

// Gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (exchange *Exchange) GetConn(connId int) (*amqpDriver.Connection, error) {
if connId == 0 {
conn := (*exchange.Connections)[*exchange.MaxConnId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised")
}
return conn, nil
} else {
conn := (*exchange.Connections)[connId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId)
}
return conn, nil
}
}

// Declare creates a new exchange given the provided options.
func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -65,8 +96,12 @@ func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
}

// Delete removes an exchange from the remote server given the exchange name.
func (exchange *Exchange) Delete(name string) error {
ch, err := exchange.Connection.Channel()
func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error {
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -82,7 +117,11 @@ func (exchange *Exchange) Delete(name string) error {

// Bind subscribes one exchange to another.
func (exchange *Exchange) Bind(options ExchangeBindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -100,7 +139,11 @@ func (exchange *Exchange) Bind(options ExchangeBindOptions) error {

// Unbind removes a subscription from one exchange to another.
func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ebfe638

Please sign in to comment.