Skip to content

Commit

Permalink
backend: add timeouts for c.Send() and additional debugging information
Browse files Browse the repository at this point in the history
Additional debugging information will be used to debug the long-standing broadcast channel (line 96 in lobby/server.go) filling to the limit of 200 messages and therefore no clients being able to create a new game.
  • Loading branch information
mytja committed Sep 7, 2024
1 parent ed63c6e commit 867f5de
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
26 changes: 17 additions & 9 deletions backend/internal/lobby/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func (c *clientImpl) Close() {

// Send sends lobby_messages
func (c *clientImpl) Send(msg *lobby_messages.LobbyMessage) {
c.send <- msg
c.logger.Debugw("sending message to c.send", "msg", msg)
select {
case c.send <- msg:
c.logger.Debugw("sent message to c.send", "msg", msg)
case <-time.After(5 * time.Second):
c.logger.Warnw("timeout sending message to c.send", "msg", msg)
}
}

// Checks if our ReadMessage error is a normal disconnect event
Expand All @@ -100,7 +106,7 @@ func (c *clientImpl) ReadPump() {
// We will handle disconnects here since websocket will
// error on this call when we receive it
if isUnexpectedClose(err) {
c.logger.Errorw("unexpected close on client socket",
c.logger.Debugw("unexpected close on client socket",
"id", c.user.ID, "remoteAddr", c.addr, zap.Error(err))
}

Expand Down Expand Up @@ -135,22 +141,22 @@ func (c *clientImpl) ReadPump() {
c.logger.Debugw("authenticating user")
token := u.LoginInfo.Token
if token == "" {
c.send <- &lobby_messages.LobbyMessage{Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Fail_{Fail: &lobby_messages.LoginResponse_Fail{}}}}}
c.Send(&lobby_messages.LobbyMessage{Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Fail_{Fail: &lobby_messages.LoginResponse_Fail{}}}}})
time.Sleep(100 * time.Millisecond)
c.Close()
return
}

user, err := c.server.GetDB().CheckTokenString(token)
if err != nil {
c.send <- &lobby_messages.LobbyMessage{Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Fail_{Fail: &lobby_messages.LoginResponse_Fail{}}}}}
c.Send(&lobby_messages.LobbyMessage{Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Fail_{Fail: &lobby_messages.LoginResponse_Fail{}}}}})
time.Sleep(100 * time.Millisecond)
c.Close()
return
}

c.user = user
c.send <- &lobby_messages.LobbyMessage{PlayerId: c.user.ID, Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Ok{Ok: &lobby_messages.LoginResponse_OK{}}}}}
c.Send(&lobby_messages.LobbyMessage{PlayerId: c.user.ID, Data: &lobby_messages.LobbyMessage_LoginResponse{LoginResponse: &lobby_messages.LoginResponse{Type: &lobby_messages.LoginResponse_Ok{Ok: &lobby_messages.LoginResponse_OK{}}}}})
authenticated = true
c.server.Authenticated(c)
break
Expand Down Expand Up @@ -190,32 +196,34 @@ func (c *clientImpl) SendPump() {
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
c.logger.Errorw("error while setting write deadline", "err", err)
return
break
}

writer, err := c.conn.NextWriter(websocket.BinaryMessage)
if err != nil {
c.logger.Warnw("error while getting NextWriter for client",
"id", c.user.ID, "remoteAddr", c.addr, zap.Error(err))
return
break
}

rawMessage, err := proto.Marshal(message)
if err != nil {
c.logger.Errorw("error while marshalling protobuf message", "err", err)
return
break
}
_, err = writer.Write(rawMessage)
if err != nil {
c.logger.Errorw("error while writing to the writer", "err", err)
return
break
}
// We need to close the writer so that our message
// gets flushed to the client
if err = writer.Close(); err != nil {
c.logger.Warnw("error while closing client writer",
"id", c.user.ID, "remoteAddr", c.addr, zap.Error(err))
}

c.logger.Debugw("sent message", "id", c.user.ID, "remoteAddr", c.addr, "msg", message)
}

c.logger.Debugw("exiting client send pump", "id", c.user.ID, "remoteAddr", c.addr)
Expand Down
16 changes: 10 additions & 6 deletions backend/internal/lobby/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/mytja/Tarok/backend/internal/consts"
"github.com/mytja/Tarok/backend/internal/events"
"github.com/mytja/Tarok/backend/internal/helpers"
"github.com/mytja/Tarok/backend/internal/lobby_messages"
"github.com/mytja/Tarok/backend/internal/sql"
Expand All @@ -12,9 +14,6 @@ import (
"net/http"
"os"
"time"

"github.com/mytja/Tarok/backend/internal/consts"
"github.com/mytja/Tarok/backend/internal/events"
)

var (
Expand Down Expand Up @@ -88,8 +87,10 @@ func (s *serverImpl) Run() {
}

// če je v igri, naj prvo pošljemo sporočilo onlineStatus=1, nato ga pa še čisto do konca disconnectamo
time.Sleep(100 * time.Millisecond)
events.Publish("lobby.onlineStatus", disconnect.GetUserID(), int32(0))
go func() {
time.Sleep(100 * time.Millisecond)
s.ChangeOnlineStatus(disconnect.GetUserID(), int32(0))
}()

s.logger.Debugw("done disconnecting the user")
case broadcast := <-s.broadcast:
Expand All @@ -98,8 +99,10 @@ func (s *serverImpl) Run() {
//broadcast.msg.PlayerId = broadcast.excludeClient
for _, client := range s.clients {
s.logger.Debugw("found client to broadcast message to", "clientId", client.GetClientID())
client.Send(broadcast)
go client.Send(broadcast) // make it not block others, it's not that important
}

s.logger.Debugw("broadcast has ended", "msg", broadcast)
}
}
}
Expand Down Expand Up @@ -525,6 +528,7 @@ func (s *serverImpl) handleDisconnect() {
}

func (s *serverImpl) Broadcast(msg *lobby_messages.LobbyMessage) {
s.logger.Debugw("received broadcast", "msg", msg)
s.broadcast <- msg
}

Expand Down
14 changes: 10 additions & 4 deletions backend/internal/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ func (c *clientImpl) Close() {

// Send sends messages
func (c *clientImpl) Send(msg *messages.Message) {
c.send <- msg
c.logger.Debugw("sending message to c.send", "msg", msg)
select {
case c.send <- msg:
c.logger.Debugw("sent message to c.send", "msg", msg)
case <-time.After(5 * time.Second):
c.logger.Warnw("timeout sending message to c.send", "msg", msg)
}
}

// Checks if our ReadMessage error is a normal disconnect event
Expand Down Expand Up @@ -167,20 +173,20 @@ func (c *clientImpl) ReadPump() {
c.logger.Debugw("authenticating user")
token := u.LoginInfo.Token
if token == "" {
c.send <- &messages.Message{Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Fail_{Fail: &messages.LoginResponse_Fail{}}}}}
c.Send(&messages.Message{Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Fail_{Fail: &messages.LoginResponse_Fail{}}}}})
c.Close()
return
}

user, err := c.server.GetDB().CheckTokenString(token)
if err != nil {
c.send <- &messages.Message{Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Fail_{Fail: &messages.LoginResponse_Fail{}}}}}
c.Send(&messages.Message{Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Fail_{Fail: &messages.LoginResponse_Fail{}}}}})
c.Close()
return
}

c.user = user
c.send <- &messages.Message{Username: c.user.Name, PlayerId: c.user.ID, Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Ok{Ok: &messages.LoginResponse_OK{}}}}}
c.Send(&messages.Message{Username: c.user.Name, PlayerId: c.user.ID, Data: &messages.Message_LoginResponse{LoginResponse: &messages.LoginResponse{Type: &messages.LoginResponse_Ok{Ok: &messages.LoginResponse_OK{}}}}})
c.server.Authenticated(c)
break
case *messages.Message_Licitiranje:
Expand Down

0 comments on commit 867f5de

Please sign in to comment.