diff --git a/api/room/room.go b/api/room/room.go index 803a33e..91fd8ab 100644 --- a/api/room/room.go +++ b/api/room/room.go @@ -18,6 +18,7 @@ const ( StartType = "start" CloseType = "close" PingType = "ping" + ReplyType = "reply" JoinType = "join" ErrorType = "error" LeaveType = "leave" @@ -50,6 +51,14 @@ type Message struct { Content interface{} `json:"content"` } +func (m *Message) GetReply() *Message { + return &Message{ + Type: ReplyType, + RequestId: m.RequestId, + Content: map[string]string{}, + } +} + func (m *Message) ToString() string { bs, err := json.Marshal(m) if err != nil { diff --git a/serve/socket/socket.go b/serve/socket/socket.go index 792a4bf..1ebbe5f 100644 --- a/serve/socket/socket.go +++ b/serve/socket/socket.go @@ -69,6 +69,25 @@ func (s *socket) WriteMessage(messageType int, data []byte) error { return s.conn.WriteMessage(messageType, data) } +func (s *socket) WriteDataIgnoreError(data interface{}) { + err := s.WriteData(data) + if err != nil { + joinLog.WithError(err).Error("send message write message") + s.writeWebsocketError(roomApi.NewNetWorkTimeoutError(err.Error())) + } +} + +func (s *socket) WriteData(data interface{}) error { + s.rwLock.Lock() + defer s.rwLock.Unlock() + bs, err := json.Marshal(data) + if err != nil { + return roomApi.NewMessageContentError("send message marshal error %s", err.Error()) + } + + return s.conn.WriteMessage(websocket.TextMessage, bs) +} + func (s *socket) WriteJSON(v interface{}) error { s.rwLock.Lock() defer s.rwLock.Unlock() @@ -133,24 +152,14 @@ func readClientMessage(ctx context.Context, socket *socket, room roomApi.RemoteR return nil } + socket.WriteDataIgnoreError(msg.GetReply()) return nil } func onRoomMessage(ctx context.Context, socket *socket, room roomApi.RemoteRoom) error { select { case msg := <-room.OnMessage(): - bs, err := json.Marshal(msg) - if err != nil { - return roomApi.NewMessageContentError("send message marshal error %s", err.Error()) - } - - err = socket.WriteMessage(websocket.TextMessage, bs) - if err != nil { - joinLog.WithError(err).Error("send message write message") - socket.writeWebsocketError(roomApi.NewNetWorkTimeoutError(err.Error())) - return nil - } - + socket.WriteDataIgnoreError(msg) case <-room.Done(): return roomApi.NewRoomCloseError("room %s leave", room.GetRoomAddress().ID) case <-ctx.Done(): @@ -235,7 +244,7 @@ func (s *WebSocket) serveRoom(opt *roomApi.Info, connection *roomApi.Connection, if err != nil { retCode = "read_message_close" socket.writeWebsocketError(err) - joinLog.WithField("connection", connection.Address.ID).Info(err) + joinLog.WithField("readClientMessage", connection.Address.ID).Info(err) return } }