Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Commit

Permalink
Merge pull request #17 from PabloDelBarrioArnanz/main
Browse files Browse the repository at this point in the history
Add messagepack encoding
  • Loading branch information
javaducky authored Nov 9, 2022
2 parents 4f25f55 + 64c7085 commit 45dbcb6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
21 changes: 20 additions & 1 deletion amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
package amqp

import (
"encoding/json"

amqpDriver "github.com/streadway/amqp"
"github.com/vmihailenco/msgpack/v5"
"go.k6.io/k6/js/modules"
)

Expand Down Expand Up @@ -58,6 +61,8 @@ type ListenOptions struct {
Args amqpDriver.Table
}

const messagepack = "application/x-msgpack"

// Start establishes a session with an AMQP server given the provided options.
func (amqp *AMQP) Start(options Options) error {
conn, err := amqpDriver.Dial(options.ConnectionURL)
Expand All @@ -80,7 +85,21 @@ func (amqp *AMQP) Publish(options PublishOptions) error {
publishing := amqpDriver.Publishing{
Headers: options.Headers,
ContentType: options.ContentType,
Body: []byte(options.Body),
}

if options.ContentType == messagepack {
var jsonParsedBody interface{}

if err = json.Unmarshal([]byte(options.Body), &jsonParsedBody); err != nil {
return err
}

publishing.Body, err = msgpack.Marshal(jsonParsedBody)
if err != nil {
return err
}
} else {
publishing.Body = []byte(options.Body)
}

if options.Persistent {
Expand Down
41 changes: 41 additions & 0 deletions examples/test-msgpack.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})
console.log("Connection opened: " + url)

const queueName = 'K6 general'

Queue.declare({
name: queueName
})

console.log(queueName + " queue is ready")

let body = {
metadata: {
header1: "Performance Test Message"
},
body: {
field1: "some value"
}
}

Amqp.publish({
queue_name: queueName,
body: JSON.stringify(body),
content_type: "application/x-msgpack"
})

const listener = function(data) { console.log('received data: ' + data) }
Amqp.listen({
queue_name: queueName,
listener: listener,
auto_ack: true
})
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.15

require (
github.com/streadway/amqp v1.0.0
github.com/vmihailenco/msgpack/v5 v5.3.5
go.k6.io/k6 v0.33.0
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand All @@ -324,6 +325,10 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
github.com/urfave/negroni v0.3.1-0.20180130044549-22c5532ea862/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down

0 comments on commit 45dbcb6

Please sign in to comment.