Skip to content

Commit

Permalink
docs(examples): 4th and final part of the "private messaging" example
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 16, 2021
1 parent 7247b40 commit 7467216
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 17 deletions.
31 changes: 31 additions & 0 deletions examples/private-messaging/server/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");

const WORKERS_COUNT = 4;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}

cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});

const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}
7 changes: 7 additions & 0 deletions examples/private-messaging/server/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "3"

services:
redis:
image: redis:5
ports:
- "6379:6379"
36 changes: 22 additions & 14 deletions examples/private-messaging/server/index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});

const { setupWorker } = require("@socket.io/sticky");
const crypto = require("crypto");
const randomId = () => crypto.randomBytes(8).toString("hex");

const { InMemorySessionStore } = require("./sessionStore");
const sessionStore = new InMemorySessionStore();
const { RedisSessionStore } = require("./sessionStore");
const sessionStore = new RedisSessionStore(redisClient);

const { InMemoryMessageStore } = require("./messageStore");
const messageStore = new InMemoryMessageStore();
const { RedisMessageStore } = require("./messageStore");
const messageStore = new RedisMessageStore(redisClient);

io.use((socket, next) => {
io.use(async (socket, next) => {
const sessionID = socket.handshake.auth.sessionID;
if (sessionID) {
const session = sessionStore.findSession(sessionID);
const session = await sessionStore.findSession(sessionID);
if (session) {
socket.sessionID = sessionID;
socket.userID = session.userID;
Expand All @@ -35,7 +42,7 @@ io.use((socket, next) => {
next();
});

io.on("connection", (socket) => {
io.on("connection", async (socket) => {
// persist session
sessionStore.saveSession(socket.sessionID, {
userID: socket.userID,
Expand All @@ -54,8 +61,12 @@ io.on("connection", (socket) => {

// fetch existing users
const users = [];
const [messages, sessions] = await Promise.all([
messageStore.findMessagesForUser(socket.userID),
sessionStore.findAllSessions(),
]);
const messagesPerUser = new Map();
messageStore.findMessagesForUser(socket.userID).forEach((message) => {
messages.forEach((message) => {
const { from, to } = message;
const otherUser = socket.userID === from ? to : from;
if (messagesPerUser.has(otherUser)) {
Expand All @@ -64,7 +75,8 @@ io.on("connection", (socket) => {
messagesPerUser.set(otherUser, [message]);
}
});
sessionStore.findAllSessions().forEach((session) => {

sessions.forEach((session) => {
users.push({
userID: session.userID,
username: session.username,
Expand Down Expand Up @@ -110,8 +122,4 @@ io.on("connection", (socket) => {
});
});

const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
setupWorker(io);
29 changes: 29 additions & 0 deletions examples/private-messaging/server/messageStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ class InMemoryMessageStore extends MessageStore {
}
}

const CONVERSATION_TTL = 24 * 60 * 60;

class RedisMessageStore extends MessageStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}

saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}

findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}

module.exports = {
InMemoryMessageStore,
RedisMessageStore,
};
7 changes: 5 additions & 2 deletions examples/private-messaging/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "node index.js"
"start": "node cluster.js"
},
"author": "Damien Arrachequesne <[email protected]>",
"license": "MIT",
"dependencies": {
"socket.io": "^3.1.1"
"@socket.io/sticky": "^1.0.0",
"ioredis": "^4.22.0",
"socket.io": "^3.1.1",
"socket.io-redis": "^6.0.1"
}
}
63 changes: 62 additions & 1 deletion examples/private-messaging/server/sessionStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,67 @@ class InMemorySessionStore extends SessionStore {
}
}

const SESSION_TTL = 24 * 60 * 60;
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;

class RedisSessionStore extends SessionStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}

findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}

saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(
`session:${id}`,
"userID",
userID,
"username",
username,
"connected",
connected
)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}

async findAllSessions() {
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
module.exports = {
InMemorySessionStore
InMemorySessionStore,
RedisSessionStore,
};

0 comments on commit 7467216

Please sign in to comment.