Skip to content

Commit

Permalink
add message reply.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangdashuaihenshuai committed Dec 26, 2023
1 parent 4495fb3 commit fcdbdc9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
9 changes: 9 additions & 0 deletions api/room/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
StartType = "start"
CloseType = "close"
PingType = "ping"
ReplyType = "reply"
JoinType = "join"
ErrorType = "error"
LeaveType = "leave"
Expand Down Expand Up @@ -50,6 +51,14 @@ type Message struct {
Content interface{} `json:"content"`
}

func (m *Message) GetReply() *Message {
return &Message{
Type: ReplyType,
RequestId: m.RequestId,
Content: map[string]string{},
}
}

func (m *Message) ToString() string {
bs, err := json.Marshal(m)
if err != nil {
Expand Down
35 changes: 22 additions & 13 deletions serve/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ func (s *socket) WriteMessage(messageType int, data []byte) error {
return s.conn.WriteMessage(messageType, data)
}

func (s *socket) WriteDataIgnoreError(data interface{}) {
err := s.WriteData(data)
if err != nil {
joinLog.WithError(err).Error("send message write message")
s.writeWebsocketError(roomApi.NewNetWorkTimeoutError(err.Error()))
}
}

func (s *socket) WriteData(data interface{}) error {
s.rwLock.Lock()
defer s.rwLock.Unlock()
bs, err := json.Marshal(data)
if err != nil {
return roomApi.NewMessageContentError("send message marshal error %s", err.Error())
}

return s.conn.WriteMessage(websocket.TextMessage, bs)
}

func (s *socket) WriteJSON(v interface{}) error {
s.rwLock.Lock()
defer s.rwLock.Unlock()
Expand Down Expand Up @@ -133,24 +152,14 @@ func readClientMessage(ctx context.Context, socket *socket, room roomApi.RemoteR
return nil
}

socket.WriteDataIgnoreError(msg.GetReply())
return nil
}

func onRoomMessage(ctx context.Context, socket *socket, room roomApi.RemoteRoom) error {
select {
case msg := <-room.OnMessage():
bs, err := json.Marshal(msg)
if err != nil {
return roomApi.NewMessageContentError("send message marshal error %s", err.Error())
}

err = socket.WriteMessage(websocket.TextMessage, bs)
if err != nil {
joinLog.WithError(err).Error("send message write message")
socket.writeWebsocketError(roomApi.NewNetWorkTimeoutError(err.Error()))
return nil
}

socket.WriteDataIgnoreError(msg)
case <-room.Done():
return roomApi.NewRoomCloseError("room %s leave", room.GetRoomAddress().ID)
case <-ctx.Done():
Expand Down Expand Up @@ -235,7 +244,7 @@ func (s *WebSocket) serveRoom(opt *roomApi.Info, connection *roomApi.Connection,
if err != nil {
retCode = "read_message_close"
socket.writeWebsocketError(err)
joinLog.WithField("connection", connection.Address.ID).Info(err)
joinLog.WithField("readClientMessage", connection.Address.ID).Info(err)
return
}
}
Expand Down

0 comments on commit fcdbdc9

Please sign in to comment.