Skip to content

Commit

Permalink
handle ping pong interval & better handle active subscription close
Browse files Browse the repository at this point in the history
  • Loading branch information
maeglindeveloper committed Apr 13, 2021
1 parent d3ba467 commit dd2fb3c
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions graphql/handler/transport/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
Upgrader websocket.Upgrader
InitFunc WebsocketInitFunc
KeepAlivePingInterval time.Duration
PingPongInterval time.Duration
}
wsConnection struct {
Websocket
Expand All @@ -42,6 +43,7 @@ type (
active map[string]context.CancelFunc
mu sync.Mutex
keepAliveTicker *time.Ticker
pingPongTicker *time.Ticker
exec graphql.GraphExecutor

initPayload InitPayload
Expand Down Expand Up @@ -138,22 +140,38 @@ func (c *wsConnection) run() {
ctx, cancel := context.WithCancel(c.ctx)
defer func() {
cancel()
c.close(websocket.CloseAbnormalClosure, "unexpected closure")
}()

// Create a timer that will fire every interval to keep the connection alive.
if c.KeepAlivePingInterval != 0 {
c.mu.Lock()
c.keepAliveTicker = time.NewTicker(c.KeepAlivePingInterval)
c.mu.Unlock()

go c.keepAlive(ctx)
}

// Create a timer that will fire every interval a ping message that should
// receive a pong (SetPongHandler in init() function)
if c.PingPongInterval != 0 {

pongWait := 2 * c.PingPongInterval
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().UTC().Add(pongWait))
})

c.mu.Lock()
c.pingPongTicker = time.NewTicker(c.PingPongInterval)
c.mu.Unlock()

go c.ping(ctx)
}

for {
start := graphql.Now()
message := c.readOp()
if message == nil {
c.close(websocket.CloseAbnormalClosure, "unexpected closure")
return
}

Expand Down Expand Up @@ -190,6 +208,20 @@ func (c *wsConnection) keepAlive(ctx context.Context) {
}
}

func (c *wsConnection) ping(ctx context.Context) {
for {
select {
case <-ctx.Done():
c.pingPongTicker.Stop()
return
case <-c.pingPongTicker.C:
c.mu.Lock()
c.conn.WriteMessage(websocket.PingMessage, nil)
c.mu.Unlock()
}
}
}

func (c *wsConnection) subscribe(start time.Time, message *operationMessage) {
ctx := graphql.StartOperationTrace(c.ctx)
var params *graphql.RawParams
Expand Down Expand Up @@ -311,6 +343,9 @@ func (c *wsConnection) readOp() *operationMessage {
func (c *wsConnection) close(closeCode int, message string) {
c.mu.Lock()
_ = c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(closeCode, message))
for key, closer := range c.active {
closer()
}
c.mu.Unlock()
_ = c.conn.Close()
}

0 comments on commit dd2fb3c

Please sign in to comment.