Skip to content

Commit

Permalink
Call WS message handlers depending on the message type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Apr 6, 2021
1 parent e44fb6d commit 5e29bdf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
23 changes: 15 additions & 8 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type WSHTTPResponse struct {
Error string `json:"error"`
}

type message struct {
mtype int // message type consts as defined in gorilla/websocket/conn.go
data []byte
}

const writeWait = 10 * time.Second

func New() *WS {
Expand Down Expand Up @@ -245,7 +250,7 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
conn.SetPingHandler(func(msg string) error { pingChan <- msg; return nil })
conn.SetPongHandler(func(pingID string) error { pongChan <- pingID; return nil })

readDataChan := make(chan []byte)
readDataChan := make(chan *message)
readCloseChan := make(chan int)
readErrChan := make(chan error)

Expand Down Expand Up @@ -285,17 +290,19 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
socket.trackPong(pingID)
socket.handleEvent("pong")

case readData := <-readDataChan:
case msg := <-readDataChan:
stats.PushIfNotDone(ctx, socket.samplesOutput, stats.Sample{
Metric: metrics.WSMessagesReceived,
Time: time.Now(),
Tags: socket.sampleTags,
Value: 1,
})
socket.handleEvent("message", rt.ToValue(string(readData)))
if _, ok := socket.eventHandlers["binaryMessage"]; ok {
ab := rt.NewArrayBuffer(readData)

if msg.mtype == websocket.BinaryMessage {
ab := rt.NewArrayBuffer(msg.data)
socket.handleEvent("binaryMessage", rt.ToValue(&ab))
} else {
socket.handleEvent("message", rt.ToValue(string(msg.data)))
}

case readErr := <-readErrChan:
Expand Down Expand Up @@ -505,9 +512,9 @@ func (s *Socket) closeConnection(code int) error {
}

// Wraps conn.ReadMessage in a channel
func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan chan int) {
func (s *Socket) readPump(readChan chan *message, errorChan chan error, closeChan chan int) {
for {
_, message, err := s.conn.ReadMessage()
messageType, data, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
Expand All @@ -530,7 +537,7 @@ func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan
}

select {
case readChan <- message:
case readChan <- &message{messageType, data}:
case <-s.done:
return
}
Expand Down
17 changes: 4 additions & 13 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,22 +382,16 @@ func TestSocketSendBinary(t *testing.T) {
assert.NoError(t, err)

_, err = rt.RunString(sr(`
var gotMsg = false, gotBinMsg = false;
var gotMsg = false;
var res = ws.connect('WSBIN_URL/ws-echo', function(socket){
var data = new Uint8Array([104, 101, 108, 108, 111]); // 'hello'
socket.on('open', function() {
socket.sendBinary(data.buffer);
})
socket.on('message', function(msg) {
socket.on('binaryMessage', function(msg) {
gotMsg = true;
if (msg !== 'hello') {
throw new Error('received unexpected message: ' + msg);
}
});
socket.on('binaryMessage', function(msgBin) {
gotBinMsg = true;
let decText = String.fromCharCode.apply(null, new Uint8Array(msgBin));
let decText = String.fromCharCode.apply(null, new Uint8Array(msg));
decText = decodeURIComponent(escape(decText));
if (decText !== 'hello') {
throw new Error('received unexpected binary message: ' + decText);
Expand All @@ -406,9 +400,6 @@ func TestSocketSendBinary(t *testing.T) {
});
});
if (!gotMsg) {
throw new Error("the 'message' handler wasn't called")
}
if (!gotBinMsg) {
throw new Error("the 'binaryMessage' handler wasn't called")
}
`))
Expand Down Expand Up @@ -675,7 +666,7 @@ func TestReadPump(t *testing.T) {
_ = conn.Close()
}()

msgChan := make(chan []byte)
msgChan := make(chan *message)
errChan := make(chan error)
closeChan := make(chan int)
s := &Socket{conn: conn}
Expand Down

0 comments on commit 5e29bdf

Please sign in to comment.