From 670cf22272b490005d46dc2bee1634de1cd06d68 Mon Sep 17 00:00:00 2001 From: MoofMonkey <11695747+MoofMonkey@users.noreply.github.com> Date: Thu, 26 May 2022 19:28:10 +0300 Subject: [PATCH] Add support for subscription keepalives in websocket client --- client/websocket.go | 46 ++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/client/websocket.go b/client/websocket.go index 58ecbf45dd..8f283072ec 100644 --- a/client/websocket.go +++ b/client/websocket.go @@ -109,32 +109,36 @@ func (p *Client) WebsocketWithPayload(query string, initPayload map[string]inter return c.Close() }, Next: func(response interface{}) error { - var op operationMessage - err := c.ReadJSON(&op) - if err != nil { - return err - } - if op.Type != dataMsg { - if op.Type == errorMsg { - return fmt.Errorf(string(op.Payload)) - } else { - return fmt.Errorf("expected data message, got %#v", op) + for { + var op operationMessage + err := c.ReadJSON(&op) + if err != nil { + return err + } + if op.Type != dataMsg { + if op.Type == connectionKaMsg { + continue + } else if op.Type == errorMsg { + return fmt.Errorf(string(op.Payload)) + } else { + return fmt.Errorf("expected data message, got %#v", op) + } } - } - var respDataRaw Response - err = json.Unmarshal(op.Payload, &respDataRaw) - if err != nil { - return fmt.Errorf("decode: %w", err) - } + var respDataRaw Response + err = json.Unmarshal(op.Payload, &respDataRaw) + if err != nil { + return fmt.Errorf("decode: %w", err) + } - // we want to unpack even if there is an error, so we can see partial responses - unpackErr := unpack(respDataRaw.Data, response) + // we want to unpack even if there is an error, so we can see partial responses + unpackErr := unpack(respDataRaw.Data, response) - if respDataRaw.Errors != nil { - return RawJsonError{respDataRaw.Errors} + if respDataRaw.Errors != nil { + return RawJsonError{respDataRaw.Errors} + } + return unpackErr } - return unpackErr }, } }