-
Notifications
You must be signed in to change notification settings - Fork 30
Init AMQP k6 plugin #1
Conversation
* Open connection * Publish message to queue
As we have k6 limitation and cannot read data in parallel
Add listening functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor feedback items. All in all; looks good. 👏🏽
README.md
Outdated
@@ -2,7 +2,7 @@ | |||
> | |||
> As this is a proof of concept, it won't be supported by the k6 team. | |||
> It may also break in the future as xk6 evolves. USE AT YOUR OWN RISK! | |||
> Any issues with the tool should be raised [here](https://github.com/lxkuz/xk6-amqp/issues). | |||
> Any issues with the tool should be raised [here](https://github.com/k6io/xk6-amqp/issues). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> Any issues with the tool should be raised [here](https://github.com/k6io/xk6-amqp/issues). | |
> Any issues with the tool should be raised [here](https://github.com/grafana/xk6-amqp/issues). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simskij does it mean I should use github.com/grafana
instead of github.com/k6io
everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct
README.md
Outdated
@@ -30,7 +30,7 @@ Then: | |||
|
|||
2. Build the binary: | |||
```bash | |||
$ xk6 build --with github.com/lxkuz/xk6-amqp@latest | |||
$ xk6 build --with github.com/k6io/xk6-amqp@latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$ xk6 build --with github.com/k6io/xk6-amqp@latest | |
$ xk6 build --with github.com/grafana/xk6-amqp@latest |
amqp.go
Outdated
modules.Register("k6/x/amqp/queues", &queues) | ||
modules.Register("k6/x/amqp/exchanges", &exchanges) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modules.Register("k6/x/amqp/queues", &queues) | |
modules.Register("k6/x/amqp/exchanges", &exchanges) | |
modules.Register("k6/x/amqp/queue", &queues) | |
modules.Register("k6/x/amqp/exchange", &exchanges) |
Use singularis throughout unless the pluralization adds semantic value.
exchanges.go
Outdated
ConnectionUrl string | ||
} | ||
|
||
type EchangeDeclareOptions struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type EchangeDeclareOptions struct { | |
type ExchangeDeclareOptions struct { |
@simskij review comments handled |
README.md
Outdated
Queues.declare({ | ||
name: queueName, | ||
// durable: false, | ||
// delete_when_unused: false, | ||
// exclusive: false, | ||
// no_wait: false, | ||
// args: null | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singular form of Queue
queues.go
Outdated
Name string | ||
Durable bool | ||
DeleteWhenUnused bool | ||
Exclusive bool | ||
NoWait bool | ||
Args amqpDriver.Table | ||
} | ||
|
||
type QueueBindOptions struct { | ||
QueueName string | ||
ExchangeName string | ||
RoutingKey string | ||
NoWait bool | ||
Args amqpDriver.Table | ||
} | ||
|
||
type QueueUnindOptions struct { | ||
QueueName string | ||
ExchangeName string | ||
RoutingKey string | ||
Args amqpDriver.Table | ||
} | ||
|
||
func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error) { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return amqpDriver.Queue{}, err | ||
} | ||
defer ch.Close() | ||
return ch.QueueDeclare( | ||
options.Name, | ||
options.Durable, | ||
options.DeleteWhenUnused, | ||
options.Exclusive, | ||
options.NoWait, | ||
options.Args, | ||
) | ||
} | ||
|
||
func (queues *Queues) Inspect(name string) (amqpDriver.Queue, error) { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return amqpDriver.Queue{}, err | ||
} | ||
defer ch.Close() | ||
return ch.QueueInspect(name) | ||
} | ||
|
||
func (queues *Queues) Delete(name string) error { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return err | ||
} | ||
defer ch.Close() | ||
_, err = ch.QueueDelete( | ||
name, | ||
false, // ifUnused | ||
false, // ifEmpty | ||
false, // noWait | ||
) | ||
return err | ||
} | ||
|
||
func (queues *Queues) Bind(options QueueBindOptions) error { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return err | ||
} | ||
defer ch.Close() | ||
return ch.QueueBind( | ||
options.QueueName, | ||
options.RoutingKey, | ||
options.ExchangeName, | ||
options.NoWait, | ||
options.Args, | ||
) | ||
} | ||
|
||
func (queues *Queues) Unbind(options QueueUnindOptions) error { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return err | ||
} | ||
defer ch.Close() | ||
return ch.QueueUnbind( | ||
options.QueueName, | ||
options.RoutingKey, | ||
options.ExchangeName, | ||
options.Args, | ||
) | ||
} | ||
|
||
func (queues *Queues) Purge(name string, noWait bool) (int, error) { | ||
ch, err := queues.Connection.Channel() | ||
if err != nil { | ||
return 0, err | ||
} | ||
defer ch.Close() | ||
return ch.QueuePurge(name, noWait) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singular form of Queue
@@ -0,0 +1,100 @@ | |||
package amqp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singular form of Exchange
…ck-cypher Feature/add messagepack cypher
Initial AMQP functionality including: