From f93df34059a0867c7c33bc992aff0e3a5ddb0f14 Mon Sep 17 00:00:00 2001 From: Adam Date: Thu, 8 Aug 2019 10:50:45 +1000 Subject: [PATCH] send keepalive on init --- client/websocket.go | 11 +++++++++++ handler/websocket.go | 1 + handler/websocket_test.go | 7 +++++++ 3 files changed, 19 insertions(+) diff --git a/client/websocket.go b/client/websocket.go index f7452b4c978..4b6b3ae7c56 100644 --- a/client/websocket.go +++ b/client/websocket.go @@ -13,6 +13,7 @@ const ( connectionInitMsg = "connection_init" // Client -> Server startMsg = "start" // Client -> Server connectionAckMsg = "connection_ack" // Server -> Client + connectionKa = "ka" // Server -> Client dataMsg = "data" // Server -> Client errorMsg = "error" // Server -> Client ) @@ -72,10 +73,20 @@ func (p *Client) WebsocketWithPayload(query string, initPayload map[string]inter if err = c.ReadJSON(&ack); err != nil { return errorSubscription(fmt.Errorf("ack: %s", err.Error())) } + if ack.Type != connectionAckMsg { return errorSubscription(fmt.Errorf("expected ack message, got %#v", ack)) } + var ka operationMessage + if err = c.ReadJSON(&ka); err != nil { + return errorSubscription(fmt.Errorf("ka: %s", err.Error())) + } + + if ka.Type != connectionKa { + return errorSubscription(fmt.Errorf("expected ka message, got %#v", ack)) + } + if err = c.WriteJSON(operationMessage{Type: startMsg, ID: "1", Payload: requestBody}); err != nil { return errorSubscription(fmt.Errorf("start: %s", err.Error())) } diff --git a/handler/websocket.go b/handler/websocket.go index 07a1a8c2dd8..8b67a5b0d46 100644 --- a/handler/websocket.go +++ b/handler/websocket.go @@ -103,6 +103,7 @@ func (c *wsConnection) init() bool { } c.write(&operationMessage{Type: connectionAckMsg}) + c.write(&operationMessage{Type: connectionKeepAliveMsg}) case connectionTerminateMsg: c.close(websocket.CloseNormalClosure, "terminated") return false diff --git a/handler/websocket_test.go b/handler/websocket_test.go index dc3e656e5fe..17b3a2a40c0 100644 --- a/handler/websocket_test.go +++ b/handler/websocket_test.go @@ -59,6 +59,7 @@ func TestWebsocket(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) }) t.Run("client can terminate before run", func(t *testing.T) { @@ -67,6 +68,7 @@ func TestWebsocket(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionTerminateMsg})) @@ -80,6 +82,7 @@ func TestWebsocket(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) require.NoError(t, c.WriteJSON(&operationMessage{ Type: startMsg, @@ -98,6 +101,7 @@ func TestWebsocket(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) require.NoError(t, c.WriteJSON(&operationMessage{ Type: startMsg, @@ -138,6 +142,7 @@ func TestWebsocketWithKeepAlive(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) require.NoError(t, c.WriteJSON(&operationMessage{ Type: startMsg, @@ -174,6 +179,7 @@ func TestWebsocketInitFunc(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) }) t.Run("accept connection if WebsocketInitFunc is provided and is accepting connection", func(t *testing.T) { @@ -189,6 +195,7 @@ func TestWebsocketInitFunc(t *testing.T) { require.NoError(t, c.WriteJSON(&operationMessage{Type: connectionInitMsg})) require.Equal(t, connectionAckMsg, readOp(c).Type) + require.Equal(t, connectionKeepAliveMsg, readOp(c).Type) }) t.Run("reject connection if WebsocketInitFunc is provided and is accepting connection", func(t *testing.T) {