diff --git a/amqp.go b/amqp.go index ffe5ccb..b635d03 100644 --- a/amqp.go +++ b/amqp.go @@ -4,6 +4,7 @@ package amqp import ( "context" "encoding/json" + "fmt" "time" amqpDriver "github.com/rabbitmq/amqp091-go" @@ -15,10 +16,11 @@ const version = "v0.3.0" // AMQP type holds connection to a remote AMQP server. type AMQP struct { - Version string - Connection *amqpDriver.Connection - Queue *Queue - Exchange *Exchange + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnID *int + Queue *Queue + Exchange *Exchange } // Options defines configuration options for an AMQP session. @@ -28,6 +30,7 @@ type Options struct { // PublishOptions defines a message payload with delivery options. type PublishOptions struct { + ConnectionID int QueueName string Body string Headers amqpDriver.Table @@ -61,30 +64,51 @@ type ListenerType func(string) error // ListenOptions defines options for subscribing to message(s) within a queue. type ListenOptions struct { - Listener ListenerType - QueueName string - Consumer string - AutoAck bool - Exclusive bool - NoLocal bool - NoWait bool - Args amqpDriver.Table + ConnectionID int + Listener ListenerType + QueueName string + Consumer string + AutoAck bool + Exclusive bool + NoLocal bool + NoWait bool + Args amqpDriver.Table } const messagepack = "application/x-msgpack" // Start establishes a session with an AMQP server given the provided options. -func (amqp *AMQP) Start(options Options) error { +func (amqp *AMQP) Start(options Options) (int, error) { conn, err := amqpDriver.Dial(options.ConnectionURL) - amqp.Connection = conn - amqp.Queue.Connection = conn - amqp.Exchange.Connection = conn - return err + *amqp.MaxConnID++ + (*amqp.Connections)[*amqp.MaxConnID] = conn + return *amqp.MaxConnID, err +} + +// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (amqp *AMQP) GetConn(connID int) (*amqpDriver.Connection, error) { + if connID == 0 { + conn := (*amqp.Connections)[*amqp.MaxConnID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised") + } + return conn, nil + } + + conn := (*amqp.Connections)[connID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID) + } + return conn, nil } // Publish delivers the payload using options provided. func (amqp *AMQP) Publish(options PublishOptions) error { - ch, err := amqp.Connection.Channel() + conn, err := amqp.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -141,7 +165,11 @@ func (amqp *AMQP) Publish(options PublishOptions) error { // Listen binds to an AMQP queue in order to receive message(s) as they are received. func (amqp *AMQP) Listen(options ListenOptions) error { - ch, err := amqp.Connection.Channel() + conn, err := amqp.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -171,12 +199,22 @@ func (amqp *AMQP) Listen(options ListenOptions) error { } func init() { - queue := Queue{} - exchange := Exchange{} + connections := make(map[int]*amqpDriver.Connection) + maxConnID := 0 + queue := Queue{ + Connections: &connections, + MaxConnID: &maxConnID, + } + exchange := Exchange{ + Connections: &connections, + MaxConnID: &maxConnID, + } generalAMQP := AMQP{ - Version: version, - Queue: &queue, - Exchange: &exchange, + Version: version, + Connections: &connections, + MaxConnID: &maxConnID, + Queue: &queue, + Exchange: &exchange, } modules.Register("k6/x/amqp", &generalAMQP) diff --git a/examples/connection-per-vu.js b/examples/connection-per-vu.js new file mode 100644 index 0000000..32c9270 --- /dev/null +++ b/examples/connection-per-vu.js @@ -0,0 +1,34 @@ +import Amqp from 'k6/x/amqp' +import exec from 'k6/execution' + +export const options = { + vus: 10, + duration: '30s', +} + +const url = "amqp://guest:guest@localhost:5672/" +const connIds = new Map() + +function getConnectionId (vuId) { + if (!connIds.has(vuId)) { + const connectionId = Amqp.start({ connection_url: url }) + connIds.set(vuId, connectionId) + return connectionId + } + return connIds.get(vuId) +} + +export default function () { + console.log("K6 amqp extension enabled, version: " + Amqp.version) + + const connectionId = getConnectionId(exec.vu.idInInstance) + const queueName = 'K6 queue' + + Amqp.publish({ + connection_id: connectionId, + queue_name: queueName, + exchange: '', + content_type: 'text/plain', + body: 'Ping from k6' + }) +} diff --git a/exchanges.go b/exchanges.go index 74d06b4..4e13185 100644 --- a/exchanges.go +++ b/exchanges.go @@ -1,13 +1,16 @@ package amqp import ( + "fmt" + amqpDriver "github.com/rabbitmq/amqp091-go" ) // Exchange defines a connection to publish/subscribe destinations. type Exchange struct { - Version string - Connection *amqpDriver.Connection + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnID *int } // ExchangeOptions defines configuration settings for accessing an exchange. @@ -17,17 +20,24 @@ type ExchangeOptions struct { // ExchangeDeclareOptions provides options when declaring (creating) an exchange. type ExchangeDeclareOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - Args amqpDriver.Table + ConnectionID int + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Args amqpDriver.Table +} + +// ExchangeDeleteOptions provides options when deleting an exchange. +type ExchangeDeleteOptions struct { + ConnectionID int } // ExchangeBindOptions provides options when binding (subscribing) one exchange to another. type ExchangeBindOptions struct { + ConnectionID int DestinationExchangeName string SourceExchangeName string RoutingKey string @@ -37,6 +47,7 @@ type ExchangeBindOptions struct { // ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another. type ExchangeUnbindOptions struct { + ConnectionID int DestinationExchangeName string SourceExchangeName string RoutingKey string @@ -44,9 +55,30 @@ type ExchangeUnbindOptions struct { Args amqpDriver.Table } +// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (exchange *Exchange) GetConn(connID int) (*amqpDriver.Connection, error) { + if connID == 0 { + conn := (*exchange.Connections)[*exchange.MaxConnID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised") + } + return conn, nil + } + + conn := (*exchange.Connections)[connID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID) + } + return conn, nil +} + // Declare creates a new exchange given the provided options. func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -65,8 +97,12 @@ func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error { } // Delete removes an exchange from the remote server given the exchange name. -func (exchange *Exchange) Delete(name string) error { - ch, err := exchange.Connection.Channel() +func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error { + conn, err := exchange.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -82,7 +118,11 @@ func (exchange *Exchange) Delete(name string) error { // Bind subscribes one exchange to another. func (exchange *Exchange) Bind(options ExchangeBindOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -100,7 +140,11 @@ func (exchange *Exchange) Bind(options ExchangeBindOptions) error { // Unbind removes a subscription from one exchange to another. func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error { - ch, err := exchange.Connection.Channel() + conn, err := exchange.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } diff --git a/queues.go b/queues.go index ac4b2f4..f449c48 100644 --- a/queues.go +++ b/queues.go @@ -1,13 +1,16 @@ package amqp import ( + "fmt" + amqpDriver "github.com/rabbitmq/amqp091-go" ) // Queue defines a connection to a point-to-point destination. type Queue struct { - Version string - Connection *amqpDriver.Connection + Version string + Connections *map[int]*amqpDriver.Connection + MaxConnID *int } // QueueOptions defines configuration settings for accessing a queue. @@ -17,6 +20,7 @@ type QueueOptions struct { // DeclareOptions provides queue options when declaring (creating) a queue. type DeclareOptions struct { + ConnectionID int Name string Durable bool DeleteWhenUnused bool @@ -25,8 +29,19 @@ type DeclareOptions struct { Args amqpDriver.Table } +// QueueInspectOptions provide options when inspecting a queue. +type QueueInspectOptions struct { + ConnectionID int +} + +// QueueDeleteOptions provide options when deleting a queue. +type QueueDeleteOptions struct { + ConnectionID int +} + // QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s). type QueueBindOptions struct { + ConnectionID int QueueName string ExchangeName string RoutingKey string @@ -36,15 +51,42 @@ type QueueBindOptions struct { // QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s). type QueueUnbindOptions struct { + ConnectionID int QueueName string ExchangeName string RoutingKey string Args amqpDriver.Table } +// QueuePurgeOptions provide options when purging (emptying) a queue. +type QueuePurgeOptions struct { + ConnectionID int +} + +// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0 +func (queue *Queue) GetConn(connID int) (*amqpDriver.Connection, error) { + if connID == 0 { + conn := (*queue.Connections)[*queue.MaxConnID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised") + } + return conn, nil + } + + conn := (*queue.Connections)[connID] + if conn == nil { + return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID) + } + return conn, nil +} + // Declare creates a new queue given the provided options. func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return amqpDriver.Queue{}, err + } + ch, err := conn.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -62,8 +104,12 @@ func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) { } // Inspect provides queue metadata given queue name. -func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) { - ch, err := queue.Connection.Channel() +func (queue *Queue) Inspect(name string, options QueueInspectOptions) (amqpDriver.Queue, error) { + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return amqpDriver.Queue{}, err + } + ch, err := conn.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -74,8 +120,12 @@ func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) { } // Delete removes a queue from the remote server given the queue name. -func (queue *Queue) Delete(name string) error { - ch, err := queue.Connection.Channel() +func (queue *Queue) Delete(name string, options QueueDeleteOptions) error { + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -93,7 +143,11 @@ func (queue *Queue) Delete(name string) error { // Bind subscribes a queue to an exchange in order to receive message(s). func (queue *Queue) Bind(options QueueBindOptions) error { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -111,7 +165,11 @@ func (queue *Queue) Bind(options QueueBindOptions) error { // Unbind removes a queue subscription from an exchange to discontinue receiving message(s). func (queue *Queue) Unbind(options QueueUnbindOptions) error { - ch, err := queue.Connection.Channel() + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return err + } + ch, err := conn.Channel() if err != nil { return err } @@ -127,8 +185,12 @@ func (queue *Queue) Unbind(options QueueUnbindOptions) error { } // Purge removes all non-consumed message(s) from the specified queue. -func (queue *Queue) Purge(name string, noWait bool) (int, error) { - ch, err := queue.Connection.Channel() +func (queue *Queue) Purge(name string, noWait bool, options QueuePurgeOptions) (int, error) { + conn, err := queue.GetConn(options.ConnectionID) + if err != nil { + return 0, err + } + ch, err := conn.Channel() if err != nil { return 0, err }