Skip to content

Commit

Permalink
Add ArrayBuffer support in k6/ws module
Browse files Browse the repository at this point in the history
I experimented with overriding the first argument passed to the
'message' handler to be a String object with a `buffer` data property,
but it doesn't expose the string primitive properties (.length, etc.) so
it would be a breaking change.

Passing the ArrayBuffer as the second argument is backwards compatible,
and hopefully doesn't cause a doubling of memory usage (I'm expecting it
to be garbage collected if the argument isn't used). We could detect the
number of arguments defined on the handler and only pass the ArrayBuffer
if expected, but this would be awkward to implement.

Part of #1020
  • Loading branch information
Ivan Mirić committed Apr 6, 2021
1 parent a5c7378 commit 9fd189d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
27 changes: 21 additions & 6 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
Tags: socket.sampleTags,
Value: 1,
})
socket.handleEvent("message", rt.ToValue(string(readData)))
ab := rt.NewArrayBuffer(readData)
socket.handleEvent("message", rt.ToValue(string(readData)), rt.ToValue(&ab))

case readErr := <-readErrChan:
socket.handleEvent("error", rt.ToValue(readErr))
Expand Down Expand Up @@ -334,13 +335,27 @@ func (s *Socket) handleEvent(event string, args ...goja.Value) {
}
}

func (s *Socket) Send(message string) {
// NOTE: No binary message support for the time being since goja doesn't
// support typed arrays.
// Send writes the given string or ArrayBuffer message to the connection.
func (s *Socket) Send(message interface{}) {
rt := common.GetRuntime(s.ctx)

writeData := []byte(message)
if err := s.conn.WriteMessage(websocket.TextMessage, writeData); err != nil {
var (
msgType int
msg []byte
)
switch m := message.(type) {
case string:
msgType = websocket.TextMessage
msg = []byte(m)
case goja.ArrayBuffer:
msgType = websocket.BinaryMessage
msg = m.Bytes()
default:
err := fmt.Errorf("unsupported message type: %T, expected string or ArrayBuffer", message)
common.Throw(common.GetRuntime(s.ctx), err)
}

if err := s.conn.WriteMessage(msgType, msg); err != nil {
s.handleEvent("error", rt.ToValue(err))
}

Expand Down
68 changes: 68 additions & 0 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ func TestSession(t *testing.T) {
assert.NoError(t, err)
})

t.Run("send_err", func(t *testing.T) {
_, err := rt.RunString(sr(`
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.on("open", function() {
socket.send(1);
})
});
`))
require.Error(t, err)
require.Contains(t, err.Error(), "unsupported message type: int64, expected string or ArrayBuffer")
})

samplesBuf := stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo"))
Expand Down Expand Up @@ -347,6 +359,62 @@ func TestSession(t *testing.T) {
}
}

func TestSocketSendBinary(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
defer tb.Cleanup()
sr := tb.Replacer.Replace

root, err := lib.NewGroup("", nil)
assert.NoError(t, err)

rt := goja.New()
rt.SetFieldNameMapper(common.FieldNameMapper{})
samples := make(chan stats.SampleContainer, 1000)
state := &lib.State{
Group: root,
Dialer: tb.Dialer,
Options: lib.Options{
SystemTags: stats.NewSystemTagSet(
stats.TagURL,
stats.TagProto,
stats.TagStatus,
stats.TagSubproto,
),
},
Samples: samples,
TLSConfig: tb.TLSClientConfig,
}

ctx := context.Background()
ctx = lib.WithState(ctx, state)
ctx = common.WithRuntime(ctx, rt)

err = rt.Set("ws", common.Bind(rt, New(), &ctx))
assert.NoError(t, err)

_, err = rt.RunString(sr(`
var res = ws.connect('WSBIN_URL/ws-echo', function(socket){
var data = new Uint8Array([104, 101, 108, 108, 111]); // 'hello'
socket.on('open', function() {
socket.send(data.buffer);
})
socket.on('message', function (msg, msgBin){
if (msg !== 'hello') {
throw new Error('received unexpected message: ' + msg);
}
let decText = String.fromCharCode.apply(null, new Uint8Array(msgBin));
decText = decodeURIComponent(escape(decText));
if (decText !== 'hello') {
throw new Error('received unexpected binary message: ' + decText);
}
socket.close()
});
});
`))
assert.NoError(t, err)
}

func TestErrors(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
Expand Down

0 comments on commit 9fd189d

Please sign in to comment.