Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Allow creating multiple connections and select which one to use #27

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 62 additions & 24 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package amqp
import (
"context"
"encoding/json"
"fmt"
"time"

amqpDriver "github.com/rabbitmq/amqp091-go"
Expand All @@ -15,10 +16,11 @@ const version = "v0.3.0"

// AMQP type holds connection to a remote AMQP server.
type AMQP struct {
Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnID *int
Queue *Queue
Exchange *Exchange
}

// Options defines configuration options for an AMQP session.
Expand All @@ -28,6 +30,7 @@ type Options struct {

// PublishOptions defines a message payload with delivery options.
type PublishOptions struct {
ConnectionID int
QueueName string
Body string
Headers amqpDriver.Table
Expand Down Expand Up @@ -61,30 +64,51 @@ type ListenerType func(string) error

// ListenOptions defines options for subscribing to message(s) within a queue.
type ListenOptions struct {
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
ConnectionID int
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

const messagepack = "application/x-msgpack"

// Start establishes a session with an AMQP server given the provided options.
func (amqp *AMQP) Start(options Options) error {
func (amqp *AMQP) Start(options Options) (int, error) {
conn, err := amqpDriver.Dial(options.ConnectionURL)
amqp.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
*amqp.MaxConnID++
(*amqp.Connections)[*amqp.MaxConnID] = conn
return *amqp.MaxConnID, err
}

// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (amqp *AMQP) GetConn(connID int) (*amqpDriver.Connection, error) {
if connID == 0 {
conn := (*amqp.Connections)[*amqp.MaxConnID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised")
}
return conn, nil
}

conn := (*amqp.Connections)[connID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID)
}
return conn, nil
}

// Publish delivers the payload using options provided.
func (amqp *AMQP) Publish(options PublishOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +165,11 @@ func (amqp *AMQP) Publish(options PublishOptions) error {

// Listen binds to an AMQP queue in order to receive message(s) as they are received.
func (amqp *AMQP) Listen(options ListenOptions) error {
ch, err := amqp.Connection.Channel()
conn, err := amqp.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down Expand Up @@ -171,12 +199,22 @@ func (amqp *AMQP) Listen(options ListenOptions) error {
}

func init() {
queue := Queue{}
exchange := Exchange{}
connections := make(map[int]*amqpDriver.Connection)
maxConnID := 0
queue := Queue{
Connections: &connections,
MaxConnID: &maxConnID,
}
exchange := Exchange{
Connections: &connections,
MaxConnID: &maxConnID,
}
generalAMQP := AMQP{
Version: version,
Queue: &queue,
Exchange: &exchange,
Version: version,
Connections: &connections,
MaxConnID: &maxConnID,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAMQP)
Expand Down
34 changes: 34 additions & 0 deletions examples/connection-per-vu.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Amqp from 'k6/x/amqp'
import exec from 'k6/execution'

export const options = {
vus: 10,
duration: '30s',
}

const url = "amqp://guest:guest@localhost:5672/"
const connIds = new Map()

function getConnectionId (vuId) {
if (!connIds.has(vuId)) {
const connectionId = Amqp.start({ connection_url: url })
connIds.set(vuId, connectionId)
return connectionId
}
return connIds.get(vuId)
}

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)

const connectionId = getConnectionId(exec.vu.idInInstance)
const queueName = 'K6 queue'

Amqp.publish({
connection_id: connectionId,
queue_name: queueName,
exchange: '',
content_type: 'text/plain',
body: 'Ping from k6'
})
}
72 changes: 58 additions & 14 deletions exchanges.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package amqp

import (
"fmt"

amqpDriver "github.com/rabbitmq/amqp091-go"
)

// Exchange defines a connection to publish/subscribe destinations.
type Exchange struct {
Version string
Connection *amqpDriver.Connection
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnID *int
}

// ExchangeOptions defines configuration settings for accessing an exchange.
Expand All @@ -17,17 +20,24 @@ type ExchangeOptions struct {

// ExchangeDeclareOptions provides options when declaring (creating) an exchange.
type ExchangeDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
ConnectionID int
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

// ExchangeDeleteOptions provides options when deleting an exchange.
type ExchangeDeleteOptions struct {
ConnectionID int
}

// ExchangeBindOptions provides options when binding (subscribing) one exchange to another.
type ExchangeBindOptions struct {
ConnectionID int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
Expand All @@ -37,16 +47,38 @@ type ExchangeBindOptions struct {

// ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.
type ExchangeUnbindOptions struct {
ConnectionID int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (exchange *Exchange) GetConn(connID int) (*amqpDriver.Connection, error) {
if connID == 0 {
conn := (*exchange.Connections)[*exchange.MaxConnID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised")
}
return conn, nil
}

conn := (*exchange.Connections)[connID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID)
}
return conn, nil
}

// Declare creates a new exchange given the provided options.
func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -65,8 +97,12 @@ func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
}

// Delete removes an exchange from the remote server given the exchange name.
func (exchange *Exchange) Delete(name string) error {
ch, err := exchange.Connection.Channel()
func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error {
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -82,7 +118,11 @@ func (exchange *Exchange) Delete(name string) error {

// Bind subscribes one exchange to another.
func (exchange *Exchange) Bind(options ExchangeBindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand All @@ -100,7 +140,11 @@ func (exchange *Exchange) Bind(options ExchangeBindOptions) error {

// Unbind removes a subscription from one exchange to another.
func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error {
ch, err := exchange.Connection.Channel()
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
Expand Down
Loading