Skip to content

Commit

Permalink
Call WS message handlers depending on the message type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Apr 28, 2021
1 parent ed06a08 commit c614ab5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
23 changes: 15 additions & 8 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type WSHTTPResponse struct ***REMOVED***
Error string `json:"error"`
***REMOVED***

type message struct ***REMOVED***
mtype int // message type consts as defined in gorilla/websocket/conn.go
data []byte
***REMOVED***

const writeWait = 10 * time.Second

func New() *WS ***REMOVED***
Expand Down Expand Up @@ -240,7 +245,7 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
conn.SetPingHandler(func(msg string) error ***REMOVED*** pingChan <- msg; return nil ***REMOVED***)
conn.SetPongHandler(func(pingID string) error ***REMOVED*** pongChan <- pingID; return nil ***REMOVED***)

readDataChan := make(chan []byte)
readDataChan := make(chan *message)
readCloseChan := make(chan int)
readErrChan := make(chan error)

Expand Down Expand Up @@ -280,17 +285,19 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
socket.trackPong(pingID)
socket.handleEvent("pong")

case readData := <-readDataChan:
case msg := <-readDataChan:
stats.PushIfNotDone(ctx, socket.samplesOutput, stats.Sample***REMOVED***
Metric: metrics.WSMessagesReceived,
Time: time.Now(),
Tags: socket.sampleTags,
Value: 1,
***REMOVED***)
socket.handleEvent("message", rt.ToValue(string(readData)))
if _, ok := socket.eventHandlers["binaryMessage"]; ok ***REMOVED***
ab := rt.NewArrayBuffer(readData)

if msg.mtype == websocket.BinaryMessage ***REMOVED***
ab := rt.NewArrayBuffer(msg.data)
socket.handleEvent("binaryMessage", rt.ToValue(&ab))
***REMOVED*** else ***REMOVED***
socket.handleEvent("message", rt.ToValue(string(msg.data)))
***REMOVED***

case readErr := <-readErrChan:
Expand Down Expand Up @@ -500,9 +507,9 @@ func (s *Socket) closeConnection(code int) error ***REMOVED***
***REMOVED***

// Wraps conn.ReadMessage in a channel
func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan chan int) ***REMOVED***
func (s *Socket) readPump(readChan chan *message, errorChan chan error, closeChan chan int) ***REMOVED***
for ***REMOVED***
_, message, err := s.conn.ReadMessage()
messageType, data, err := s.conn.ReadMessage()
if err != nil ***REMOVED***
if websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) ***REMOVED***
Expand All @@ -525,7 +532,7 @@ func (s *Socket) readPump(readChan chan []byte, errorChan chan error, closeChan
***REMOVED***

select ***REMOVED***
case readChan <- message:
case readChan <- &message***REMOVED***messageType, data***REMOVED***:
case <-s.done:
return
***REMOVED***
Expand Down
17 changes: 4 additions & 13 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,22 +382,16 @@ func TestSocketSendBinary(t *testing.T) ***REMOVED***
assert.NoError(t, err)

_, err = rt.RunString(sr(`
var gotMsg = false, gotBinMsg = false;
var gotMsg = false;
var res = ws.connect('WSBIN_URL/ws-echo', function(socket)***REMOVED***
var data = new Uint8Array([104, 101, 108, 108, 111]); // 'hello'
socket.on('open', function() ***REMOVED***
socket.sendBinary(data.buffer);
***REMOVED***)
socket.on('message', function(msg) ***REMOVED***
socket.on('binaryMessage', function(msg) ***REMOVED***
gotMsg = true;
if (msg !== 'hello') ***REMOVED***
throw new Error('received unexpected message: ' + msg);
***REMOVED***
***REMOVED***);
socket.on('binaryMessage', function(msgBin) ***REMOVED***
gotBinMsg = true;
let decText = String.fromCharCode.apply(null, new Uint8Array(msgBin));
let decText = String.fromCharCode.apply(null, new Uint8Array(msg));
decText = decodeURIComponent(escape(decText));
if (decText !== 'hello') ***REMOVED***
throw new Error('received unexpected binary message: ' + decText);
Expand All @@ -406,9 +400,6 @@ func TestSocketSendBinary(t *testing.T) ***REMOVED***
***REMOVED***);
***REMOVED***);
if (!gotMsg) ***REMOVED***
throw new Error("the 'message' handler wasn't called")
***REMOVED***
if (!gotBinMsg) ***REMOVED***
throw new Error("the 'binaryMessage' handler wasn't called")
***REMOVED***
`))
Expand Down Expand Up @@ -675,7 +666,7 @@ func TestReadPump(t *testing.T) ***REMOVED***
_ = conn.Close()
***REMOVED***()

msgChan := make(chan []byte)
msgChan := make(chan *message)
errChan := make(chan error)
closeChan := make(chan int)
s := &Socket***REMOVED***conn: conn***REMOVED***
Expand Down

0 comments on commit c614ab5

Please sign in to comment.