Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Commit

Permalink
Singularize queue and exchange structures
Browse files Browse the repository at this point in the history
  • Loading branch information
lxkuz committed Jul 23, 2021
1 parent 2461cfe commit 6097daf
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 34 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ $ ./build.sh && ./k6 run my-test-script.js

```javascript
import Amqp from 'k6/x/amqp';
import Queues from 'k6/x/amqp/queues';
import Queue from 'k6/x/amqp/queue';

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)
Expand All @@ -57,7 +57,7 @@ export default function () {

const queueName = 'K6 general'

Queues.declare({
Queue.declare({
name: queueName,
// durable: false,
// delete_when_unused: false,
Expand Down
20 changes: 10 additions & 10 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const version = "v0.0.1"
type Amqp struct {
Version string
Connection *amqpDriver.Connection
Queues *Queues
Exchanges *Exchanges
Queue *Queue
Exchange *Exchange
}

type AmqpOptions struct {
Expand Down Expand Up @@ -51,8 +51,8 @@ type ListenOptions struct {
func (amqp *Amqp) Start(options AmqpOptions) error {
conn, err := amqpDriver.Dial(options.ConnectionUrl)
amqp.Connection = conn
amqp.Queues.Connection = conn
amqp.Exchanges.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
}

Expand Down Expand Up @@ -105,15 +105,15 @@ func (amqp *Amqp) Listen(options ListenOptions) error {

func init() {

queues := Queues{}
exchanges := Exchanges{}
queue := Queue{}
exchange := Exchange{}
generalAmqp := Amqp{
Version: version,
Queues: &queues,
Exchanges: &exchanges,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAmqp)
modules.Register("k6/x/amqp/queue", &queues)
modules.Register("k6/x/amqp/exchange", &exchanges)
modules.Register("k6/x/amqp/queue", &queue)
modules.Register("k6/x/amqp/exchange", &exchange)
}
100 changes: 100 additions & 0 deletions channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package amqp

import (
amqpDriver "github.com/streadway/amqp"
)

type Channels struct {
Version string
Connection *amqpDriver.Connection
}

type ChannelOptions struct {
ConnectionUrl string
}

type ChannelDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

type ChannelBindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

type ChannelUnindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

func (Channels *Channels) Declare(options ChannelDeclareOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDeclare(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Delete(name string) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDelete(
name,
false, // ifUnused
false, // noWait
)
}

func (Channels *Channels) Bind(options ChannelBindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelBind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Unbind(options ChannelUnindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelUnbind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}
18 changes: 9 additions & 9 deletions exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
amqpDriver "github.com/streadway/amqp"
)

type Exchanges struct {
type Exchange struct {
Version string
Connection *amqpDriver.Connection
}
Expand Down Expand Up @@ -39,8 +39,8 @@ type ExchangeUnindOptions struct {
Args amqpDriver.Table
}

func (exchanges *Exchanges) Declare(options ExchangeDeclareOptions) error {
ch, err := exchanges.Connection.Channel()
func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
ch, err := exchange.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -56,8 +56,8 @@ func (exchanges *Exchanges) Declare(options ExchangeDeclareOptions) error {
)
}

func (exchanges *Exchanges) Delete(name string) error {
ch, err := exchanges.Connection.Channel()
func (exchange *Exchange) Delete(name string) error {
ch, err := exchange.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -69,8 +69,8 @@ func (exchanges *Exchanges) Delete(name string) error {
)
}

func (exchanges *Exchanges) Bind(options ExchangeBindOptions) error {
ch, err := exchanges.Connection.Channel()
func (exchange *Exchange) Bind(options ExchangeBindOptions) error {
ch, err := exchange.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -84,8 +84,8 @@ func (exchanges *Exchanges) Bind(options ExchangeBindOptions) error {
)
}

func (exchanges *Exchanges) Unbind(options ExchangeUnindOptions) error {
ch, err := exchanges.Connection.Channel()
func (exchange *Exchange) Unbind(options ExchangeUnindOptions) error {
ch, err := exchange.Connection.Channel()
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
amqpDriver "github.com/streadway/amqp"
)

type Queues struct {
type Queue struct {
Version string
Connection *amqpDriver.Connection
}
Expand Down Expand Up @@ -37,8 +37,8 @@ type QueueUnindOptions struct {
Args amqpDriver.Table
}

func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error) {
ch, err := queues.Connection.Channel()
func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error) {
ch, err := queue.Connection.Channel()
if err != nil {
return amqpDriver.Queue{}, err
}
Expand All @@ -53,17 +53,17 @@ func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error)
)
}

func (queues *Queues) Inspect(name string) (amqpDriver.Queue, error) {
ch, err := queues.Connection.Channel()
func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error) {
ch, err := queue.Connection.Channel()
if err != nil {
return amqpDriver.Queue{}, err
}
defer ch.Close()
return ch.QueueInspect(name)
}

func (queues *Queues) Delete(name string) error {
ch, err := queues.Connection.Channel()
func (queue *Queue) Delete(name string) error {
ch, err := queue.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -77,8 +77,8 @@ func (queues *Queues) Delete(name string) error {
return err
}

func (queues *Queues) Bind(options QueueBindOptions) error {
ch, err := queues.Connection.Channel()
func (queue *Queue) Bind(options QueueBindOptions) error {
ch, err := queue.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -92,8 +92,8 @@ func (queues *Queues) Bind(options QueueBindOptions) error {
)
}

func (queues *Queues) Unbind(options QueueUnindOptions) error {
ch, err := queues.Connection.Channel()
func (queue *Queue) Unbind(options QueueUnindOptions) error {
ch, err := queue.Connection.Channel()
if err != nil {
return err
}
Expand All @@ -106,8 +106,8 @@ func (queues *Queues) Unbind(options QueueUnindOptions) error {
)
}

func (queues *Queues) Purge(name string, noWait bool) (int, error) {
ch, err := queues.Connection.Channel()
func (queue *Queue) Purge(name string, noWait bool) (int, error) {
ch, err := queue.Connection.Channel()
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 6097daf

Please sign in to comment.