Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Commit

Permalink
Allow creating multiple connections and select which one to use
Browse files Browse the repository at this point in the history
fixes #24
  • Loading branch information
swantzter committed Jul 31, 2023
1 parent 1579c55 commit 436211b
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 49 deletions.
86 changes: 62 additions & 24 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"time"
"fmt"

amqpDriver "github.com/rabbitmq/amqp091-go"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -15,10 +16,11 @@ const version = "v0.3.0"

// AMQP type holds connection to a remote AMQP server.
type AMQP struct {
Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnId *int
Queue *Queue
Exchange *Exchange
}

// Options defines configuration options for an AMQP session.
Expand All @@ -28,6 +30,7 @@ type Options struct {

// PublishOptions defines a message payload with delivery options.
type PublishOptions struct {
ConnectionId int
QueueName string
Body string
Headers amqpDriver.Table
Expand Down Expand Up @@ -61,30 +64,51 @@ type ListenerType func(string) error

// ListenOptions defines options for subscribing to message(s) within a queue.
type ListenOptions struct {
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
ConnectionId int
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

const messagepack = "application/x-msgpack"

// Start establishes a session with an AMQP server given the provided options.
func (amqp *AMQP) Start(options Options) error {
func (amqp *AMQP) Start(options Options) (int, error) {
conn, err := amqpDriver.Dial(options.ConnectionURL)
amqp.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
*amqp.MaxConnId += 1
(*amqp.Connections)[*amqp.MaxConnId] = conn
return *amqp.MaxConnId, err
}

// Gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (amqp *AMQP) GetConn(connId int) (*amqpDriver.Connection, error) {
if connId == 0 {
conn := (*amqp.Connections)[*amqp.MaxConnId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised")
}
return conn, nil
} else {
conn := (*amqp.Connections)[connId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId)
}
return conn, nil
}
}

// Publish delivers the payload using options provided.
func (amqp *AMQP) Publish(options PublishOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +165,11 @@ func (amqp *AMQP) Publish(options PublishOptions) error {

// Listen binds to an AMQP queue in order to receive message(s) as they are received.
func (amqp *AMQP) Listen(options ListenOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -171,12 +199,22 @@ func (amqp *AMQP) Listen(options ListenOptions) error {
}

func init() {
queue := Queue{}
exchange := Exchange{}
connections := make(map[int]*amqpDriver.Connection)
maxConnId := 0
queue := Queue{
Connections: &connections,
MaxConnId: &maxConnId,
}
exchange := Exchange{
Connections: &connections,
MaxConnId: &maxConnId,
}
generalAMQP := AMQP{
Version: version,
Queue: &queue,
Exchange: &exchange,
Version: version,
Connections: &connections,
MaxConnId: &maxConnId,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAMQP)
Expand Down
71 changes: 57 additions & 14 deletions exchanges.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package amqp

import (
"fmt"
amqpDriver "github.com/rabbitmq/amqp091-go"
)

// Exchange defines a connection to publish/subscribe destinations.
type Exchange struct {
Version string
Connection *amqpDriver.Connection
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnId *int
}

// ExchangeOptions defines configuration settings for accessing an exchange.
Expand All @@ -17,17 +19,24 @@ type ExchangeOptions struct {

// ExchangeDeclareOptions provides options when declaring (creating) an exchange.
type ExchangeDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
ConnectionId int
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

// ExchangeDeleteOptions provides options when deleting an exchange.
type ExchangeDeleteOptions struct {
ConnectionId int
}

// ExchangeBindOptions provides options when binding (subscribing) one exchange to another.
type ExchangeBindOptions struct {
ConnectionId int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
Expand All @@ -37,16 +46,38 @@ type ExchangeBindOptions struct {

// ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.
type ExchangeUnbindOptions struct {
ConnectionId int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

// Gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (exchange *Exchange) GetConn(connId int) (*amqpDriver.Connection, error) {
if connId == 0 {
conn := (*exchange.Connections)[*exchange.MaxConnId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised")
}
return conn, nil
} else {
conn := (*exchange.Connections)[connId]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId)
}
return conn, nil
}
}

// Declare creates a new exchange given the provided options.
func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -65,8 +96,12 @@ func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
}

// Delete removes an exchange from the remote server given the exchange name.
func (exchange *Exchange) Delete(name string) error {
ch, err := exchange.Connection.Channel()
func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error {
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -82,7 +117,11 @@ func (exchange *Exchange) Delete(name string) error {

// Bind subscribes one exchange to another.
func (exchange *Exchange) Bind(options ExchangeBindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -100,7 +139,11 @@ func (exchange *Exchange) Bind(options ExchangeBindOptions) error {

// Unbind removes a subscription from one exchange to another.
func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionId)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 436211b

Please sign in to comment.