From 436211bbe8fcda5580d781bd2828174d9a939e99 Mon Sep 17 00:00:00 2001 From: Svante Bengtson Date: Tue, 16 May 2023 17:34:53 +0200 Subject: [PATCH] Allow creating multiple connections and select which one to use fixes #24 --- amqp.go | 86 +++++++++++++++++++++++++++++++++++++--------------- exchanges.go | 71 ++++++++++++++++++++++++++++++++++--------- queues.go | 83 +++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 191 insertions(+), 49 deletions(-) diff --git a/amqp.go b/amqp.go index ffe5ccb..43a09cd 100644 --- a/amqp.go +++ b/amqp.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "time" + "fmt" amqpDriver "github.com/rabbitmq/amqp091-go" "github.com/vmihailenco/msgpack/v5" @@ -15,10 +16,11 @@ const version = "v0.3.0" // AMQP type holds connection to a remote AMQP server. type AMQP struct { - Version string - Connection *amqpDriver.Connection - Queue *Queue - Exchange *Exchange + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnId *int + Queue *Queue + Exchange *Exchange } // Options defines configuration options for an AMQP session. @@ -28,6 +30,7 @@ type Options struct { // PublishOptions defines a message payload with delivery options. type PublishOptions struct { + ConnectionId int QueueName string Body string Headers amqpDriver.Table @@ -61,30 +64,51 @@ type ListenerType func(string) error // ListenOptions defines options for subscribing to message(s) within a queue. type ListenOptions struct { - Listener ListenerType - QueueName string - Consumer string - AutoAck bool - Exclusive bool - NoLocal bool - NoWait bool - Args amqpDriver.Table + ConnectionId int + Listener ListenerType + QueueName string + Consumer string + AutoAck bool + Exclusive bool + NoLocal bool + NoWait bool + Args amqpDriver.Table } const messagepack = "application/x-msgpack" // Start establishes a session with an AMQP server given the provided options. -func (amqp *AMQP) Start(options Options) error { +func (amqp *AMQP) Start(options Options) (int, error) { conn, err := amqpDriver.Dial(options.ConnectionURL) - amqp.Connection = conn - amqp.Queue.Connection = conn - amqp.Exchange.Connection = conn - return err + *amqp.MaxConnId += 1 + (*amqp.Connections)[*amqp.MaxConnId] = conn + return *amqp.MaxConnId, err +} + +// Gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (amqp *AMQP) GetConn(connId int) (*amqpDriver.Connection, error) { + if connId == 0 { + conn := (*amqp.Connections)[*amqp.MaxConnId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised") + } + return conn, nil + } else { + conn := (*amqp.Connections)[connId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId) + } + return conn, nil + } } // Publish delivers the payload using options provided. func (amqp *AMQP) Publish(options PublishOptions) error { - ch, err := amqp.Connection.Channel() + conn, err := amqp.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -141,7 +165,11 @@ func (amqp *AMQP) Publish(options PublishOptions) error { // Listen binds to an AMQP queue in order to receive message(s) as they are received. func (amqp *AMQP) Listen(options ListenOptions) error { - ch, err := amqp.Connection.Channel() + conn, err := amqp.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -171,12 +199,22 @@ func (amqp *AMQP) Listen(options ListenOptions) error { } func init() { - queue := Queue{} - exchange := Exchange{} + connections := make(map[int]*amqpDriver.Connection) + maxConnId := 0 + queue := Queue{ + Connections: &connections, + MaxConnId: &maxConnId, + } + exchange := Exchange{ + Connections: &connections, + MaxConnId: &maxConnId, + } generalAMQP := AMQP{ - Version: version, - Queue: &queue, - Exchange: &exchange, + Version: version, + Connections: &connections, + MaxConnId: &maxConnId, + Queue: &queue, + Exchange: &exchange, } modules.Register("k6/x/amqp", &generalAMQP) diff --git a/exchanges.go b/exchanges.go index 74d06b4..e594416 100644 --- a/exchanges.go +++ b/exchanges.go @@ -1,13 +1,15 @@ package amqp import ( + "fmt" amqpDriver "github.com/rabbitmq/amqp091-go" ) // Exchange defines a connection to publish/subscribe destinations. type Exchange struct { - Version string - Connection *amqpDriver.Connection + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnId *int } // ExchangeOptions defines configuration settings for accessing an exchange. @@ -17,17 +19,24 @@ type ExchangeOptions struct { // ExchangeDeclareOptions provides options when declaring (creating) an exchange. type ExchangeDeclareOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - Args amqpDriver.Table + ConnectionId int + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Args amqpDriver.Table +} + +// ExchangeDeleteOptions provides options when deleting an exchange. +type ExchangeDeleteOptions struct { + ConnectionId int } // ExchangeBindOptions provides options when binding (subscribing) one exchange to another. type ExchangeBindOptions struct { + ConnectionId int DestinationExchangeName string SourceExchangeName string RoutingKey string @@ -37,6 +46,7 @@ type ExchangeBindOptions struct { // ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another. type ExchangeUnbindOptions struct { + ConnectionId int DestinationExchangeName string SourceExchangeName string RoutingKey string @@ -44,9 +54,30 @@ type ExchangeUnbindOptions struct { Args amqpDriver.Table } +// Gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (exchange *Exchange) GetConn(connId int) (*amqpDriver.Connection, error) { + if connId == 0 { + conn := (*exchange.Connections)[*exchange.MaxConnId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised") + } + return conn, nil + } else { + conn := (*exchange.Connections)[connId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId) + } + return conn, nil + } +} + // Declare creates a new exchange given the provided options. func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -65,8 +96,12 @@ func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error { } // Delete removes an exchange from the remote server given the exchange name. -func (exchange *Exchange) Delete(name string) error { - ch, err := exchange.Connection.Channel() +func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error { + conn, err := exchange.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -82,7 +117,11 @@ func (exchange *Exchange) Delete(name string) error { // Bind subscribes one exchange to another. func (exchange *Exchange) Bind(options ExchangeBindOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -100,7 +139,11 @@ func (exchange *Exchange) Bind(options ExchangeBindOptions) error { // Unbind removes a subscription from one exchange to another. func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } diff --git a/queues.go b/queues.go index ac4b2f4..fca8499 100644 --- a/queues.go +++ b/queues.go @@ -1,13 +1,15 @@ package amqp import ( + "fmt" amqpDriver "github.com/rabbitmq/amqp091-go" ) // Queue defines a connection to a point-to-point destination. type Queue struct { - Version string - Connection *amqpDriver.Connection + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnId *int } // QueueOptions defines configuration settings for accessing a queue. @@ -17,6 +19,7 @@ type QueueOptions struct { // DeclareOptions provides queue options when declaring (creating) a queue. type DeclareOptions struct { + ConnectionId int Name string Durable bool DeleteWhenUnused bool @@ -25,8 +28,19 @@ type DeclareOptions struct { Args amqpDriver.Table } +// QueueInspectOptions provide options when inspecting a queue. +type QueueInspectOptions struct { + ConnectionId int +} + +// QueueDeleteOptions provide options when deleting a queue. +type QueueDeleteOptions struct { + ConnectionId int +} + // QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s). type QueueBindOptions struct { + ConnectionId int QueueName string ExchangeName string RoutingKey string @@ -36,15 +50,42 @@ type QueueBindOptions struct { // QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s). type QueueUnbindOptions struct { + ConnectionId int QueueName string ExchangeName string RoutingKey string Args amqpDriver.Table } +// QueuePurgeOptions provide options when purging (emptying) a queue. +type QueuePurgeOptions struct { + ConnectionId int +} + +// Gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (queue *Queue) GetConn(connId int) (*amqpDriver.Connection, error) { + if connId == 0 { + conn := (*queue.Connections)[*queue.MaxConnId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection not initialised") + } + return conn, nil + } else { + conn := (*queue.Connections)[connId] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("Connection with ID %d not initialised", connId) + } + return conn, nil + } +} + // Declare creates a new queue given the provided options. func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return amqpDriver.Queue{}, err + } + ch, err := conn.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -62,8 +103,12 @@ func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) { } // Inspect provides queue metadata given queue name. -func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) { - ch, err := queue.Connection.Channel() +func (queue *Queue) Inspect(name string, options QueueInspectOptions) (amqpDriver.Queue, error) { + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return amqpDriver.Queue{}, err + } + ch, err := conn.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -74,8 +119,12 @@ func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) { } // Delete removes a queue from the remote server given the queue name. -func (queue *Queue) Delete(name string) error { - ch, err := queue.Connection.Channel() +func (queue *Queue) Delete(name string, options QueueDeleteOptions) error { + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -93,7 +142,11 @@ func (queue *Queue) Delete(name string) error { // Bind subscribes a queue to an exchange in order to receive message(s). func (queue *Queue) Bind(options QueueBindOptions) error { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -111,7 +164,11 @@ func (queue *Queue) Bind(options QueueBindOptions) error { // Unbind removes a queue subscription from an exchange to discontinue receiving message(s). func (queue *Queue) Unbind(options QueueUnbindOptions) error { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -127,8 +184,12 @@ func (queue *Queue) Unbind(options QueueUnbindOptions) error { } // Purge removes all non-consumed message(s) from the specified queue. -func (queue *Queue) Purge(name string, noWait bool) (int, error) { - ch, err := queue.Connection.Channel() +func (queue *Queue) Purge(name string, noWait bool, options QueuePurgeOptions) (int, error) { + conn, err := queue.GetConn(options.ConnectionId) + if err != nil { + return 0, err + } + ch, err := conn.Channel() if err != nil { return 0, err }