From 6097daf1c844ed4a33c40aed8934f2cc33269e51 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Fri, 23 Jul 2021 18:13:56 +0500 Subject: [PATCH] Singularize queue and exchange structures --- README.md | 4 +-- amqp.go | 20 +++++------ channels.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++ exchanges.go | 18 +++++----- queues.go | 26 +++++++------- 5 files changed, 134 insertions(+), 34 deletions(-) create mode 100644 channels.go diff --git a/README.md b/README.md index 0d1d2a5..408453c 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ $ ./build.sh && ./k6 run my-test-script.js ```javascript import Amqp from 'k6/x/amqp'; -import Queues from 'k6/x/amqp/queues'; +import Queue from 'k6/x/amqp/queue'; export default function () { console.log("K6 amqp extension enabled, version: " + Amqp.version) @@ -57,7 +57,7 @@ export default function () { const queueName = 'K6 general' - Queues.declare({ + Queue.declare({ name: queueName, // durable: false, // delete_when_unused: false, diff --git a/amqp.go b/amqp.go index abbcaf6..dd48545 100644 --- a/amqp.go +++ b/amqp.go @@ -10,8 +10,8 @@ const version = "v0.0.1" type Amqp struct { Version string Connection *amqpDriver.Connection - Queues *Queues - Exchanges *Exchanges + Queue *Queue + Exchange *Exchange } type AmqpOptions struct { @@ -51,8 +51,8 @@ type ListenOptions struct { func (amqp *Amqp) Start(options AmqpOptions) error { conn, err := amqpDriver.Dial(options.ConnectionUrl) amqp.Connection = conn - amqp.Queues.Connection = conn - amqp.Exchanges.Connection = conn + amqp.Queue.Connection = conn + amqp.Exchange.Connection = conn return err } @@ -105,15 +105,15 @@ func (amqp *Amqp) Listen(options ListenOptions) error { func init() { - queues := Queues{} - exchanges := Exchanges{} + queue := Queue{} + exchange := Exchange{} generalAmqp := Amqp{ Version: version, - Queues: &queues, - Exchanges: &exchanges, + Queue: &queue, + Exchange: &exchange, } modules.Register("k6/x/amqp", &generalAmqp) - modules.Register("k6/x/amqp/queue", &queues) - modules.Register("k6/x/amqp/exchange", &exchanges) + modules.Register("k6/x/amqp/queue", &queue) + modules.Register("k6/x/amqp/exchange", &exchange) } diff --git a/channels.go b/channels.go new file mode 100644 index 0000000..e3478fb --- /dev/null +++ b/channels.go @@ -0,0 +1,100 @@ +package amqp + +import ( + amqpDriver "github.com/streadway/amqp" +) + +type Channels struct { + Version string + Connection *amqpDriver.Connection +} + +type ChannelOptions struct { + ConnectionUrl string +} + +type ChannelDeclareOptions struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Args amqpDriver.Table +} + +type ChannelBindOptions struct { + DestinationChannelName string + SourceChannelName string + RoutingKey string + NoWait bool + Args amqpDriver.Table +} + +type ChannelUnindOptions struct { + DestinationChannelName string + SourceChannelName string + RoutingKey string + NoWait bool + Args amqpDriver.Table +} + +func (Channels *Channels) Declare(options ChannelDeclareOptions) error { + ch, err := Channels.Connection.Channel() + if err != nil { + return err + } + defer ch.Close() + return ch.ChannelDeclare( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + options.Args, + ) +} + +func (Channels *Channels) Delete(name string) error { + ch, err := Channels.Connection.Channel() + if err != nil { + return err + } + defer ch.Close() + return ch.ChannelDelete( + name, + false, // ifUnused + false, // noWait + ) +} + +func (Channels *Channels) Bind(options ChannelBindOptions) error { + ch, err := Channels.Connection.Channel() + if err != nil { + return err + } + defer ch.Close() + return ch.ChannelBind( + options.DestinationChannelName, + options.RoutingKey, + options.SourceChannelName, + options.NoWait, + options.Args, + ) +} + +func (Channels *Channels) Unbind(options ChannelUnindOptions) error { + ch, err := Channels.Connection.Channel() + if err != nil { + return err + } + defer ch.Close() + return ch.ChannelUnbind( + options.DestinationChannelName, + options.RoutingKey, + options.SourceChannelName, + options.NoWait, + options.Args, + ) +} diff --git a/exchanges.go b/exchanges.go index 9899122..7eea17c 100644 --- a/exchanges.go +++ b/exchanges.go @@ -4,7 +4,7 @@ import ( amqpDriver "github.com/streadway/amqp" ) -type Exchanges struct { +type Exchange struct { Version string Connection *amqpDriver.Connection } @@ -39,8 +39,8 @@ type ExchangeUnindOptions struct { Args amqpDriver.Table } -func (exchanges *Exchanges) Declare(options ExchangeDeclareOptions) error { - ch, err := exchanges.Connection.Channel() +func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error { + ch, err := exchange.Connection.Channel() if err != nil { return err } @@ -56,8 +56,8 @@ func (exchanges *Exchanges) Declare(options ExchangeDeclareOptions) error { ) } -func (exchanges *Exchanges) Delete(name string) error { - ch, err := exchanges.Connection.Channel() +func (exchange *Exchange) Delete(name string) error { + ch, err := exchange.Connection.Channel() if err != nil { return err } @@ -69,8 +69,8 @@ func (exchanges *Exchanges) Delete(name string) error { ) } -func (exchanges *Exchanges) Bind(options ExchangeBindOptions) error { - ch, err := exchanges.Connection.Channel() +func (exchange *Exchange) Bind(options ExchangeBindOptions) error { + ch, err := exchange.Connection.Channel() if err != nil { return err } @@ -84,8 +84,8 @@ func (exchanges *Exchanges) Bind(options ExchangeBindOptions) error { ) } -func (exchanges *Exchanges) Unbind(options ExchangeUnindOptions) error { - ch, err := exchanges.Connection.Channel() +func (exchange *Exchange) Unbind(options ExchangeUnindOptions) error { + ch, err := exchange.Connection.Channel() if err != nil { return err } diff --git a/queues.go b/queues.go index 9dc837f..e362fb4 100644 --- a/queues.go +++ b/queues.go @@ -4,7 +4,7 @@ import ( amqpDriver "github.com/streadway/amqp" ) -type Queues struct { +type Queue struct { Version string Connection *amqpDriver.Connection } @@ -37,8 +37,8 @@ type QueueUnindOptions struct { Args amqpDriver.Table } -func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error) { - ch, err := queues.Connection.Channel() +func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) { + ch, err := queue.Connection.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -53,8 +53,8 @@ func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error) ) } -func (queues *Queues) Inspect(name string) (amqpDriver.Queue, error) { - ch, err := queues.Connection.Channel() +func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) { + ch, err := queue.Connection.Channel() if err != nil { return amqpDriver.Queue{}, err } @@ -62,8 +62,8 @@ func (queues *Queues) Inspect(name string) (amqpDriver.Queue, error) { return ch.QueueInspect(name) } -func (queues *Queues) Delete(name string) error { - ch, err := queues.Connection.Channel() +func (queue *Queue) Delete(name string) error { + ch, err := queue.Connection.Channel() if err != nil { return err } @@ -77,8 +77,8 @@ func (queues *Queues) Delete(name string) error { return err } -func (queues *Queues) Bind(options QueueBindOptions) error { - ch, err := queues.Connection.Channel() +func (queue *Queue) Bind(options QueueBindOptions) error { + ch, err := queue.Connection.Channel() if err != nil { return err } @@ -92,8 +92,8 @@ func (queues *Queues) Bind(options QueueBindOptions) error { ) } -func (queues *Queues) Unbind(options QueueUnindOptions) error { - ch, err := queues.Connection.Channel() +func (queue *Queue) Unbind(options QueueUnindOptions) error { + ch, err := queue.Connection.Channel() if err != nil { return err } @@ -106,8 +106,8 @@ func (queues *Queues) Unbind(options QueueUnindOptions) error { ) } -func (queues *Queues) Purge(name string, noWait bool) (int, error) { - ch, err := queues.Connection.Channel() +func (queue *Queue) Purge(name string, noWait bool) (int, error) { + ch, err := queue.Connection.Channel() if err != nil { return 0, err }