-
Notifications
You must be signed in to change notification settings - Fork 0
/
chatroom.go
112 lines (96 loc) · 2.58 KB
/
chatroom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package peers
import (
"context"
"encoding/json"
"github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// ChatRoomBufSize is the number of incoming messages to buffer for each topic.
const ChatRoomBufSize = 65535
// ChatRoom represents a subscription to a single PubSub topic. Messages
// can be published to the topic with ChatRoom.Publish, and received
// messages are pushed to the Messages channel.
type ChatRoom struct {
// Messages is a channel of messages received from other peers in the chat room
Messages chan *ChatMessage
ctx context.Context
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
roomName string
self peer.ID
nick string
}
// ChatMessage gets converted to/from JSON and sent in the body of pubsub messages.
type ChatMessage struct {
Message Msg
SenderID string
SenderNick string
}
// joinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func joinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) {
// join the pubsub topic
topic, err := ps.Join(topicName(roomName))
if err != nil {
return nil, err
}
// and subscribe to it
sub, err := topic.Subscribe(pubsub.WithBufferSize(1024))
if err != nil {
return nil, err
}
cr := &ChatRoom{
ctx: ctx,
ps: ps,
topic: topic,
sub: sub,
self: selfID,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
}
// start reading messages from the subscription in a loop
go cr.readLoop()
return cr, nil
}
// Publish sends a message to the pubsub topic.
func (cr *ChatRoom) Publish(message Msg) error {
m := ChatMessage{
Message: message,
SenderID: cr.self.String(),
SenderNick: cr.nick,
}
msgBytes, err := json.Marshal(m)
if err != nil {
return err
}
return cr.topic.Publish(cr.ctx, msgBytes)
}
func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(topicName(cr.roomName))
}
// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readLoop() {
for {
msg, err := cr.sub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.self {
continue
}
cm := new(ChatMessage)
err = json.Unmarshal(msg.Data, cm)
if err != nil {
continue
}
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}
func topicName(roomName string) string {
return "chat-room:" + roomName
}