Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Init AMQP k6 plugin #1

Merged
merged 19 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor
52 changes: 47 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
>
> As this is a proof of concept, it won't be supported by the k6 team.
> It may also break in the future as xk6 evolves. USE AT YOUR OWN RISK!
> Any issues with the tool should be raised [here](https://github.com/lxkuz/xk6-amqp/issues).
> Any issues with the tool should be raised [here](https://github.com/grafana/xk6-amqp/issues).
</br>
</br>
Expand Down Expand Up @@ -30,7 +30,7 @@ Then:

2. Build the binary:
```bash
$ xk6 build --with github.com/lxkuz/xk6-amqp@latest
$ xk6 build --with github.com/grafana/xk6-amqp@latest
```
## Development

Expand All @@ -45,9 +45,48 @@ $ ./build.sh && ./k6 run my-test-script.js

```javascript
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})
console.log("Connection opened: " + url)

const queueName = 'K6 general'

Queue.declare({
name: queueName,
// durable: false,
// delete_when_unused: false,
// exclusive: false,
// no_wait: false,
// args: null
})

console.log(queueName + " queue is ready")

Amqp.publish({
queue_name: queueName,
body: "Ping from k6"
// exchange: '',
// mandatory: false,
// immediate: false,
})

const listener = function(data) { console.log('received data: ' + data) }
Amqp.listen({
queue_name: queueName,
listener: listener,
// consumer: '',
// auto_ack: true,
// exclusive: false,
// no_local: false,
// no_wait: false,
// args: null
})
}

```
Expand All @@ -70,15 +109,18 @@ $ ./k6 run script.js
scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
* default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)
INFO[0000] K6 amqp extension enabled, version: v0.0.1
INFO[0000] K6 amqp extension enabled, version: v0.0.1 source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/ source=console
INFO[0000] K6 general queue is ready source=console
INFO[0000] received data: Ping from k6 source=console
running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs 00m00.0s/10m0s 1/1 iters, 1 per VU
data_received........: 0 B 0 B/s
data_sent............: 0 B 0 B/s
iteration_duration...: avg=9.64ms min=9.64ms med=9.64ms max=9.64ms p(90)=9.64ms p(95)=9.64ms
iterations...........: 1 25.017512/s
iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
iterations...........: 1 30.855627/s
```

Expand Down
110 changes: 106 additions & 4 deletions amqp.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,119 @@
package amqp

import (
amqpDriver "github.com/streadway/amqp"
"go.k6.io/k6/js/modules"
)

const version = "v0.0.1"

type Amqp struct {
Version string
Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
}

type AmqpOptions struct {
ConnectionUrl string
}

type PublishOptions struct {
QueueName string
Body string
Exchange string
Mandatory bool
Immediate bool
}

type ConsumeOptions struct {
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

type ListenerType func(string) error

type ListenOptions struct {
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

func (amqp *Amqp) Start(options AmqpOptions) error {
conn, err := amqpDriver.Dial(options.ConnectionUrl)
amqp.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
}

func (amqp *Amqp) Publish(options PublishOptions) error {
ch, err := amqp.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()

return ch.Publish(
options.Exchange,
options.QueueName,
options.Mandatory,
options.Immediate,
amqpDriver.Publishing{
ContentType: "text/plain",
Body: []byte(options.Body),
},
)
}

func (amqp *Amqp) Listen(options ListenOptions) error {
ch, err := amqp.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()

msgs, err := ch.Consume(
options.QueueName,
options.Consumer,
options.AutoAck,
options.Exclusive,
options.NoLocal,
options.NoWait,
options.Args,
)
if err != nil {
return err
}

go func() {
for d := range msgs {
options.Listener(string(d.Body))
}
}()
return nil
}

func init() {
modules.Register("k6/x/amqp", &Amqp{
Version: version,
})

queue := Queue{}
exchange := Exchange{}
generalAmqp := Amqp{
Version: version,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAmqp)
modules.Register("k6/x/amqp/queue", &queue)
modules.Register("k6/x/amqp/exchange", &exchange)
}
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env sh

xk6 build --with github.com/lxkuz/xk6-amqp=.
xk6 build --with github.com/grafana/xk6-amqp=.
100 changes: 100 additions & 0 deletions channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package amqp

import (
amqpDriver "github.com/streadway/amqp"
)

type Channels struct {
Version string
Connection *amqpDriver.Connection
}

type ChannelOptions struct {
ConnectionUrl string
}

type ChannelDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

type ChannelBindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

type ChannelUnindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

func (Channels *Channels) Declare(options ChannelDeclareOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDeclare(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Delete(name string) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDelete(
name,
false, // ifUnused
false, // noWait
)
}

func (Channels *Channels) Bind(options ChannelBindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelBind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Unbind(options ChannelUnindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelUnbind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}
22 changes: 22 additions & 0 deletions examples/bind-exchange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Amqp from 'k6/x/amqp';
import Exchange from 'k6/x/amqp/exchange';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

const sourceExchangeName = 'K6 exchange'
const destinationExchangeName = 'destination K6 exchange'

Exchange.bind({
destination_exchange_name: destinationExchangeName,
routing_key: '',
source_exchange_name: sourceExchangeName,
no_wait: false,
args: null
})

console.log(destinationExchangeName + " exchange binded to " + sourceExchangeName + ' exchange')
}
22 changes: 22 additions & 0 deletions examples/bind-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

const queueName = 'K6 queue'
const exchangeName = 'K6 exchange'

Queue.bind({
queue_name: queueName,
routing_key: '',
exchange_name: exchangeName,
no_wait: false,
args: null
})

console.log(queueName + " queue binded to " + exchangeName)
}
25 changes: 25 additions & 0 deletions examples/declare-exchange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import Amqp from 'k6/x/amqp';
import Exchange from 'k6/x/amqp/exchange';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

console.log("Connection opened: " + url)

const exchangeName = 'K6 exchange'

Exchange.declare({
name: exchangeName,
kind: 'direct',
durable: false,
auto_delete: false,
internal: false,
no_wait: false,
args: null
})

console.log(exchangeName + " exchange is ready")
}
Loading