Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from grafana/amqp-init
Browse files Browse the repository at this point in the history
Init AMQP k6 plugin
  • Loading branch information
lxkuz authored Jul 29, 2021
2 parents 78d64db + 6097daf commit f4800db
Show file tree
Hide file tree
Showing 21 changed files with 1,292 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor
52 changes: 47 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
>
> As this is a proof of concept, it won't be supported by the k6 team.
> It may also break in the future as xk6 evolves. USE AT YOUR OWN RISK!
> Any issues with the tool should be raised [here](https://github.com/lxkuz/xk6-amqp/issues).
> Any issues with the tool should be raised [here](https://github.com/grafana/xk6-amqp/issues).
</br>
</br>
Expand Down Expand Up @@ -30,7 +30,7 @@ Then:

2. Build the binary:
```bash
$ xk6 build --with github.com/lxkuz/xk6-amqp@latest
$ xk6 build --with github.com/grafana/xk6-amqp@latest
```
## Development

Expand All @@ -45,9 +45,48 @@ $ ./build.sh && ./k6 run my-test-script.js

```javascript
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})
console.log("Connection opened: " + url)

const queueName = 'K6 general'

Queue.declare({
name: queueName,
// durable: false,
// delete_when_unused: false,
// exclusive: false,
// no_wait: false,
// args: null
})

console.log(queueName + " queue is ready")

Amqp.publish({
queue_name: queueName,
body: "Ping from k6"
// exchange: '',
// mandatory: false,
// immediate: false,
})

const listener = function(data) { console.log('received data: ' + data) }
Amqp.listen({
queue_name: queueName,
listener: listener,
// consumer: '',
// auto_ack: true,
// exclusive: false,
// no_local: false,
// no_wait: false,
// args: null
})
}

```
Expand All @@ -70,15 +109,18 @@ $ ./k6 run script.js
scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
* default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)
INFO[0000] K6 amqp extension enabled, version: v0.0.1
INFO[0000] K6 amqp extension enabled, version: v0.0.1 source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/ source=console
INFO[0000] K6 general queue is ready source=console
INFO[0000] received data: Ping from k6 source=console
running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs 00m00.0s/10m0s 1/1 iters, 1 per VU
data_received........: 0 B 0 B/s
data_sent............: 0 B 0 B/s
iteration_duration...: avg=9.64ms min=9.64ms med=9.64ms max=9.64ms p(90)=9.64ms p(95)=9.64ms
iterations...........: 1 25.017512/s
iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
iterations...........: 1 30.855627/s
```

Expand Down
110 changes: 106 additions & 4 deletions amqp.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,119 @@
package amqp

import (
amqpDriver "github.com/streadway/amqp"
"go.k6.io/k6/js/modules"
)

const version = "v0.0.1"

type Amqp struct {
Version string
Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
}

type AmqpOptions struct {
ConnectionUrl string
}

type PublishOptions struct {
QueueName string
Body string
Exchange string
Mandatory bool
Immediate bool
}

type ConsumeOptions struct {
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

type ListenerType func(string) error

type ListenOptions struct {
Listener ListenerType
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqpDriver.Table
}

func (amqp *Amqp) Start(options AmqpOptions) error {
conn, err := amqpDriver.Dial(options.ConnectionUrl)
amqp.Connection = conn
amqp.Queue.Connection = conn
amqp.Exchange.Connection = conn
return err
}

func (amqp *Amqp) Publish(options PublishOptions) error {
ch, err := amqp.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()

return ch.Publish(
options.Exchange,
options.QueueName,
options.Mandatory,
options.Immediate,
amqpDriver.Publishing{
ContentType: "text/plain",
Body: []byte(options.Body),
},
)
}

func (amqp *Amqp) Listen(options ListenOptions) error {
ch, err := amqp.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()

msgs, err := ch.Consume(
options.QueueName,
options.Consumer,
options.AutoAck,
options.Exclusive,
options.NoLocal,
options.NoWait,
options.Args,
)
if err != nil {
return err
}

go func() {
for d := range msgs {
options.Listener(string(d.Body))
}
}()
return nil
}

func init() {
modules.Register("k6/x/amqp", &Amqp{
Version: version,
})

queue := Queue{}
exchange := Exchange{}
generalAmqp := Amqp{
Version: version,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAmqp)
modules.Register("k6/x/amqp/queue", &queue)
modules.Register("k6/x/amqp/exchange", &exchange)
}
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env sh

xk6 build --with github.com/lxkuz/xk6-amqp=.
xk6 build --with github.com/grafana/xk6-amqp=.
100 changes: 100 additions & 0 deletions channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package amqp

import (
amqpDriver "github.com/streadway/amqp"
)

type Channels struct {
Version string
Connection *amqpDriver.Connection
}

type ChannelOptions struct {
ConnectionUrl string
}

type ChannelDeclareOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}

type ChannelBindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

type ChannelUnindOptions struct {
DestinationChannelName string
SourceChannelName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}

func (Channels *Channels) Declare(options ChannelDeclareOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDeclare(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Delete(name string) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelDelete(
name,
false, // ifUnused
false, // noWait
)
}

func (Channels *Channels) Bind(options ChannelBindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelBind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}

func (Channels *Channels) Unbind(options ChannelUnindOptions) error {
ch, err := Channels.Connection.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ChannelUnbind(
options.DestinationChannelName,
options.RoutingKey,
options.SourceChannelName,
options.NoWait,
options.Args,
)
}
22 changes: 22 additions & 0 deletions examples/bind-exchange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Amqp from 'k6/x/amqp';
import Exchange from 'k6/x/amqp/exchange';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

const sourceExchangeName = 'K6 exchange'
const destinationExchangeName = 'destination K6 exchange'

Exchange.bind({
destination_exchange_name: destinationExchangeName,
routing_key: '',
source_exchange_name: sourceExchangeName,
no_wait: false,
args: null
})

console.log(destinationExchangeName + " exchange binded to " + sourceExchangeName + ' exchange')
}
22 changes: 22 additions & 0 deletions examples/bind-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

const queueName = 'K6 queue'
const exchangeName = 'K6 exchange'

Queue.bind({
queue_name: queueName,
routing_key: '',
exchange_name: exchangeName,
no_wait: false,
args: null
})

console.log(queueName + " queue binded to " + exchangeName)
}
25 changes: 25 additions & 0 deletions examples/declare-exchange.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import Amqp from 'k6/x/amqp';
import Exchange from 'k6/x/amqp/exchange';

export default function () {
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})

console.log("Connection opened: " + url)

const exchangeName = 'K6 exchange'

Exchange.declare({
name: exchangeName,
kind: 'direct',
durable: false,
auto_delete: false,
internal: false,
no_wait: false,
args: null
})

console.log(exchangeName + " exchange is ready")
}
Loading

0 comments on commit f4800db

Please sign in to comment.