diff --git a/client/package.json b/client/package.json index bab37f8..96a6576 100644 --- a/client/package.json +++ b/client/package.json @@ -10,10 +10,10 @@ "@types/jest": "^24.9.1", "@types/node": "^12.12.67", "@types/react": "^16.9.53", - "mediasoup": "^3.7.0", - "mediasoup-client": "^3.6.29", "@types/react-beautiful-dnd": "^13.0.0", "antd": "^4.14.1", + "mediasoup": "^3.7.0", + "mediasoup-client": "^3.6.29", "node-sass": "^4.14.1", "react": "^16.13.1", "react-beautiful-dnd": "^13.1.0", @@ -21,7 +21,7 @@ "react-router-dom": "^5.2.0", "react-scripts": "^3.4.4", "react-youtube": "^7.13.0", - "socket.io-client": "^2.3.1", + "socket.io-client": "^4.0.1", "typescript": "^3.7.5" }, "scripts": { diff --git a/client/src/utils/rtc-socket-client.ts b/client/src/utils/rtc-socket-client.ts index 7665d6d..cb83a1d 100644 --- a/client/src/utils/rtc-socket-client.ts +++ b/client/src/utils/rtc-socket-client.ts @@ -7,7 +7,9 @@ export const openRtcSocket = ( redisClientId: string ): Promise => { return new Promise((resolve) => { - const socket = io(rtcServerDomain); + const socket = io(rtcServerDomain, { + path: '/rtcService', + }); socket.on('connect', () => { const clientData = { diff --git a/client/src/utils/session-socket-client.ts b/client/src/utils/session-socket-client.ts index 050d0be..884fb46 100644 --- a/client/src/utils/session-socket-client.ts +++ b/client/src/utils/session-socket-client.ts @@ -11,7 +11,9 @@ export const openSessionSocket = ( roomId?: string ): Promise => { return new Promise((resolve) => { - const socket = io(sessionServerDomain); + const socket = io(sessionServerDomain, { + path: '/sessionService', + }); socket.on('connect', () => { const clientData = { diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..846bb1e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,59 @@ +# Spins up mock production environment +version: '3' +services: + # Reverse proxy/load balancer + nginx: + build: ./nginx + depends_on: + - rtcServer + - sessionServer + ports: + - '5000:80' + + # mediasoup voice server + rtcServer: + build: + context: ./rtc-server + dockerfile: Dockerfile + image: insync-rtc-service + volumes: + - /rtc-server/node_modules + expose: + - '4000' + + # socketio session server + sessionServer: + build: + context: ./server + dockerfile: Dockerfile + image: insync-session-service + volumes: + - /server/node_modules + expose: + - '5000' + depends_on: + - redisSocketIoAdapter + - redisRoomState + - redisClientRoomId + - redisWaitingRoomId + + # spin up 4 redis containers, 1 for socket.io adapter, 3 for holding room state + redisSocketIoAdapter: + image: redis:alpine + expose: + - '6379' + redisRoomState: + image: redis:alpine + command: --port 6380 + expose: + - '6380' + redisClientRoomId: + image: redis:alpine + command: --port 6381 + expose: + - '6381' + redisWaitingRoomId: + image: redis:alpine + command: --port 6382 + expose: + - '6382' diff --git a/nginx/Dockerfile b/nginx/Dockerfile new file mode 100644 index 0000000..b846f21 --- /dev/null +++ b/nginx/Dockerfile @@ -0,0 +1,2 @@ +FROM nginx:alpine +COPY nginx.conf /etc/nginx/nginx.conf diff --git a/nginx/nginx.conf b/nginx/nginx.conf new file mode 100644 index 0000000..345f623 --- /dev/null +++ b/nginx/nginx.conf @@ -0,0 +1,44 @@ +worker_processes 4; + +events { + worker_connections 1024; +} + +http { + server { + listen 80; + + location /rtcService/ { + # enable WebSockets + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_http_version 1.1; + + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $host; + proxy_pass http://rtc_server; + } + + location /sessionService/ { + # enable WebSockets + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_http_version 1.1; + + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Host $host; + proxy_pass http://socket_nodes; + } + } + + upstream rtc_server { + ip_hash; + server rtcServer:4000; + } + + upstream socket_nodes { + ip_hash; + server sessionServer:5000; + # TODO: add more physical nodes + } +} diff --git a/rtc-server/Dockerfile b/rtc-server/Dockerfile new file mode 100644 index 0000000..52d485f --- /dev/null +++ b/rtc-server/Dockerfile @@ -0,0 +1,23 @@ +# Base alpine image to install npm packages and compile ts -> js +FROM node:14-alpine AS build +COPY . . +RUN apk update \ + && apk add --no-cache --virtual .gyp \ + python \ + make \ + g++ \ + linux-headers \ + && npm install \ + && npm install mediasoup \ + && npm run build \ + && apk del .gyp + +# Main application image +FROM node:14-alpine +WORKDIR /server +COPY --from=build ./node_modules ./node_modules +COPY --from=build ./build ./build +RUN npm install pm2 -g +ENV NODE_ENV=production +EXPOSE 4000 +CMD ["pm2-runtime", "./build/server.js"] diff --git a/rtc-server/package.json b/rtc-server/package.json index 8cfede6..7988854 100644 --- a/rtc-server/package.json +++ b/rtc-server/package.json @@ -15,7 +15,7 @@ "dependencies": { "express": "^4.17.1", "mediasoup": "^3.7.4", - "socket.io": "^2.4.1" + "socket.io": "^4.0.1" }, "devDependencies": { "@types/express": "^4.17.11", diff --git a/rtc-server/server.ts b/rtc-server/server.ts index 69ddebd..1b77808 100644 --- a/rtc-server/server.ts +++ b/rtc-server/server.ts @@ -1,11 +1,18 @@ import express, { Application } from 'express'; -import http, { Server } from 'http'; -import socketIo, { Server as WebSocketServer } from 'socket.io'; +import http, { Server as HttpServer } from 'http'; +import { Server as WebSocketServer } from 'socket.io'; import attachSocketEvents from './utils/attach-socket-events'; const app: Application = express(); -const server: Server = http.createServer(app); -const io: WebSocketServer = socketIo(server, { serveClient: false }); +const server: HttpServer = http.createServer(app); +const io: WebSocketServer = new WebSocketServer(server, { + serveClient: false, + path: '/rtcService', + cors: { + methods: ['GET', 'PATCH', 'POST', 'PUT'], + origin: true, + }, +}); attachSocketEvents(io); const PORT: string | number = process.env.PORT || 4000; diff --git a/server/Dockerfile b/server/Dockerfile new file mode 100644 index 0000000..8dca9d2 --- /dev/null +++ b/server/Dockerfile @@ -0,0 +1,9 @@ +FROM node:14-alpine +WORKDIR /server +COPY . . +RUN npm install \ + && npm install -g pm2 \ + && npm run build +ENV NODE_ENV=production +EXPOSE 5000 +CMD ["pm2-runtime", "./build/server.js"]; diff --git a/server/docker-compose.yml b/server/docker-compose.yml new file mode 100644 index 0000000..945b4d6 --- /dev/null +++ b/server/docker-compose.yml @@ -0,0 +1,22 @@ +# Spins up all redis containers for development purposes +version: '3' +services: + redisSocketIoAdapter: + image: redis:alpine + ports: + - '6379:6379' + redisRoomState: + image: redis:alpine + command: --port 6380 + ports: + - '6380:6380' + redisClientRoomId: + image: redis:alpine + command: --port 6381 + ports: + - '6381:6381' + redisWaitingRoomId: + image: redis:alpine + command: --port 6382 + ports: + - '6382:6382' diff --git a/server/package.json b/server/package.json index 8c22028..522e4e1 100644 --- a/server/package.json +++ b/server/package.json @@ -16,11 +16,14 @@ "dependencies": { "express": "^4.17.1", "linked-list": "^2.1.0", - "socket.io": "^2.4.1" + "redis": "^3.1.2", + "socket.io": "^4.0.1", + "socket.io-redis": "^6.1.0" }, "devDependencies": { "@types/express": "^4.17.8", "@types/node": "^14.11.8", + "@types/redis": "^2.8.28", "@types/socket.io": "^2.1.11", "nodemon": "^2.0.4", "prettier": "^2.1.2", diff --git a/server/server.ts b/server/server.ts index 92e8dac..8fca09a 100644 --- a/server/server.ts +++ b/server/server.ts @@ -1,11 +1,26 @@ import express, { Application } from 'express'; -import http, { Server } from 'http'; -import socketIo, { Server as WebSocketServer } from 'socket.io'; +import http, { Server as HttpServer } from 'http'; +import { Server as WebSocketServer } from 'socket.io'; import attachSocketEvents from './utils/attach-socket-events'; +import { createAdapter } from 'socket.io-redis'; +import redisClients from './utils/redis-clients'; const app: Application = express(); -const server: Server = http.createServer(app); -const io: WebSocketServer = socketIo(server, { serveClient: false }); +const server: HttpServer = http.createServer(app); +const io: WebSocketServer = new WebSocketServer(server, { + serveClient: false, + path: '/sessionService', + cors: { + methods: ['GET', 'PATCH', 'POST', 'PUT'], + origin: true, + }, +}); +io.adapter( + createAdapter({ + pubClient: redisClients.adapterPubClient, + subClient: redisClients.adapterSubClient, + }) +); attachSocketEvents(io); const PORT: string | number = process.env.PORT || 5000; diff --git a/server/utils/Playlist.ts b/server/utils/Playlist.ts index 1534e70..760df77 100644 --- a/server/utils/Playlist.ts +++ b/server/utils/Playlist.ts @@ -15,15 +15,21 @@ interface PlaylistMap { /** * Playlist class implemented with Doubly-Linked-List and HashMap - * for O(1) node insertion/deletion/mutation */ export class Playlist { private list: typeof LinkedList; private map: PlaylistMap; // maps array position to node pointer - constructor() { - this.list = new LinkedList(); + // costly initialization but the most seamless refactor + constructor(playlistSnapshot: string[]) { this.map = {}; + this.list = new LinkedList(); + + playlistSnapshot.forEach((youtubeId: string, index: number) => { + const node = new VideoNode(youtubeId); + this.list.append(node); + this.map[index] = node; + }); } getYoutubeIDAtIndex(videoIndex: number): string { @@ -82,4 +88,4 @@ export class Playlist { } } -export default new Playlist(); +export default Playlist; diff --git a/server/utils/Rooms.ts b/server/utils/Rooms.ts index 875e99b..5698c4e 100644 --- a/server/utils/Rooms.ts +++ b/server/utils/Rooms.ts @@ -1,4 +1,13 @@ import { Playlist } from './Playlist'; +import redisClients from './redis-clients'; +const { + roomStateClient, + getRoomState, + clientRoomIdClient, + getClientRoomId, + waitingRoomIdClient, + getWaitingClientRoomId, +} = redisClients; export interface ClientMap { [clientId: string]: string; @@ -13,7 +22,7 @@ export interface Client { export interface Room { clients: Client[]; youtubeID: string; - playlist: Playlist; + playlist: string[]; roomType: string; hostId: string; waitingClients: { [socketId: string]: string }; @@ -24,172 +33,228 @@ export interface RoomMap { } /** - * Singleton class that aggregates/encapsulates all room information in WebSocketServer + * Singleton class that interfaces Redis state read/written to by all machines */ class Rooms { - private roomMap: RoomMap; - private clientMap: ClientMap; // maps any socket.id to its respective roomId - // used in disconnect to find if a user is in a waiting room of a room - private waitingList: { [socketId: string]: string }; - - constructor() { - this.roomMap = {}; - this.clientMap = {}; - this.waitingList = {}; // socketid -> roomId - } - - closeRoom(roomId: string) { - delete this.roomMap[roomId]; + closeRoom(roomId: string): void { + roomStateClient.DEL(roomId); } - addRoom(roomId: string, youtubeID: string, roomType: string) { - if (!this.roomMap[roomId]) { - const roomDetails = { + async addRoom( + roomId: string, + youtubeID: string, + roomType: string + ): Promise { + const roomExists: string | null = await getRoomState(roomId); + if (!roomExists) { + const roomDetails: Room = { clients: [], youtubeID, - playlist: new Playlist(), + playlist: [], roomType, hostId: roomId, waitingClients: {}, // socketId -> name }; - this.roomMap[roomId] = roomDetails; + roomStateClient.SET(roomId, JSON.stringify(roomDetails)); } } - getRoomClients(roomId: string): Client[] { - if (this.roomMap[roomId]) { - return this.roomMap[roomId].clients; + async getRoomClients(roomId: string): Promise { + const roomSnapshot: string | null = await getRoomState(roomId); + if (roomSnapshot) { + return JSON.parse(roomSnapshot).clients; } throw new Error('Room with this ID does not exist'); } - addClient(roomId: string, clientId: string, clientName: string): void { - if (this.clientMap[clientId]) { + async addClient( + roomId: string, + clientId: string, + clientName: string + ): Promise { + const clientExists: string | null = await getClientRoomId(clientId); + const roomExists: string | null = await getRoomState(roomId); + if (clientExists) { return; } - if (this.roomMap[roomId]) { + if (roomExists) { const newClient: Client = { id: clientId, name: clientName, isMuted: false, }; - this.roomMap[roomId].clients.push(newClient); - this.clientMap[clientId] = roomId; - const room = this.getRoom(roomId); - if (room.hostId.length === 0) room.hostId = clientId; + const roomSnapshot: Room = JSON.parse(roomExists); + roomSnapshot.clients.push(newClient); + if (!roomSnapshot.hostId) { + roomSnapshot.hostId = clientId; + } + + roomStateClient.SET(roomId, JSON.stringify(roomSnapshot)); + clientRoomIdClient.SET(clientId, roomId); } else { throw new Error('Room with this ID does not exist'); } } - removeClient(roomId: string, clientId: string) { - if (!this.clientMap[clientId]) { + async removeClient( + roomId: string, + clientId: string + ): Promise { + const clientExists: string | null = await getClientRoomId(clientId); + const roomExists: string | null = await getRoomState(roomId); + if (!clientExists) { return; } - if (this.roomMap[roomId]) { - const clientList: Client[] = this.getRoomClients(roomId); - for (let i = 0; i < clientList.length; i += 1) { - const client = clientList[i]; + if (roomExists) { + const roomSnapshot: Room = JSON.parse(roomExists); + for (let i = 0; i < roomSnapshot.clients.length; i += 1) { + const client: Client = roomSnapshot.clients[i]; if (client.id === clientId) { - clientList.splice(i, 1); - return clientList; + roomSnapshot.clients.splice(i, 1); + roomStateClient.SET(roomId, JSON.stringify(roomSnapshot)); } } + return roomSnapshot.clients; } else { throw new Error('Room with this ID does not exist'); } } - getClientRoomId(clientId: string): string { - if (this.clientMap[clientId]) { - return this.clientMap[clientId]; + async getClientRoomId(clientId: string): Promise { + const clientRoomId: string | null = await getClientRoomId(clientId); + if (clientRoomId) { + return clientRoomId; } throw new Error('This client ID does not exist'); } - getClient(clientId: string): Client { - const roomId: string = this.getClientRoomId(clientId); - const clientList: Client[] = this.getRoomClients(roomId); - const lookup: Client | undefined = clientList.find( + async getClient(clientId: string): Promise { + const clientRoomId: string | null = await getClientRoomId(clientId); + if (!clientRoomId) throw new Error('Client with this ID does not exist'); + const roomSnapshot: string | null = await getRoomState(clientRoomId); + if (!roomSnapshot) throw new Error('Room with this ID does not exist'); + + const lookup: Client | undefined = JSON.parse(roomSnapshot).clients.find( (client: Client) => client.id === clientId ); - if (!lookup) { - throw new Error('No clients with this ID exist in any rooms'); + throw new Error( + 'clientRoomIdClient has this client ID, but it does not exist in room!' + ); } else { return lookup; } } - isInWaitingList(clientId: string): boolean { - if (this.waitingList[clientId] === undefined) return false; - return true; + async isInWaitingList(clientId: string): Promise { + const isWaiting: string | null = await getWaitingClientRoomId(clientId); + return isWaiting ? true : false; } - addToWaitingList(socketId: string, roomId: string): void { - this.waitingList[socketId] = roomId; + mapClientIdToRoomId(socketId: string, roomId: string): void { + waitingRoomIdClient.SET(socketId, roomId); } - removeFromWaiting(socketId: string): void { - const roomId = this.waitingList[socketId]; - const room = this.getRoom(roomId); - delete this.waitingList[socketId]; - delete room.waitingClients[socketId]; + async removeFromWaiting(socketId: string): Promise { + const waitingClientRoomId: string | null = await getWaitingClientRoomId( + socketId + ); + if (!waitingClientRoomId) { + throw new Error('This client is not in a waiting list'); + } + const roomExists: string | null = await getRoomState(waitingClientRoomId); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + delete roomSnapshot.waitingClients[socketId]; + roomStateClient.SET(waitingClientRoomId, JSON.stringify(roomSnapshot)); + waitingRoomIdClient.DEL(socketId); } - getWaitingClientRoomId(socketId: string): string { - return this.waitingList[socketId]; + async getWaitingClientRoomId(socketId: string): Promise { + const waitingClientRoomId: string | null = await getWaitingClientRoomId( + socketId + ); + if (!waitingClientRoomId) { + throw new Error('This client is not in a waiting list'); + } + return waitingClientRoomId; } - updateMute(id: string, roomId: string): Client[] { - const clients = this.getRoomClients(roomId); - for (const client of clients) { + async updateMute(id: string, roomId: string): Promise { + const roomExists: string | null = await getRoomState(roomId); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + for (const client of roomSnapshot.clients) { if (client.id === id) client.isMuted = !client.isMuted; } - return clients; + roomStateClient.SET(roomId, JSON.stringify(roomSnapshot)); + return roomSnapshot.clients; } - getRoom(roomID: string) { - return this.roomMap[roomID]; + async getRoom(roomID: string): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + return JSON.parse(roomExists); } - setVideoLink(roomID: string, newYoutubeID: string): void { - if (this.roomMap[roomID]) { - this.roomMap[roomID].youtubeID = newYoutubeID; - } + async setVideoLink(roomID: string, newYoutubeID: string): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + roomSnapshot.youtubeID = newYoutubeID; + roomStateClient.SET(roomID, JSON.stringify(roomSnapshot)); } - addVideo(roomID: string, youtubeID: string): void { - if (this.roomMap[roomID]) { - this.roomMap[roomID].playlist.addVideoToTail(youtubeID); - } + async addVideo(roomID: string, youtubeID: string): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + const playlistMutator = new Playlist(roomSnapshot.playlist); + playlistMutator.addVideoToTail(youtubeID); + roomSnapshot.playlist = playlistMutator.getPlaylistIds(); + roomStateClient.SET(roomID, JSON.stringify(roomSnapshot)); } - deleteVideo(roomID: string, videoIndex: number): void { - if (this.roomMap[roomID]) { - this.roomMap[roomID].playlist.deleteVideoAtIndex(videoIndex); - } + async deleteVideo(roomID: string, videoIndex: number): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + const playlistMutator = new Playlist(roomSnapshot.playlist); + playlistMutator.deleteVideoAtIndex(videoIndex); + roomSnapshot.playlist = playlistMutator.getPlaylistIds(); + roomStateClient.SET(roomID, JSON.stringify(roomSnapshot)); } - changeVideo(roomID: string, videoIndex: number): string { - if (this.roomMap[roomID]) { - const youtubeID = this.roomMap[roomID].playlist.getYoutubeIDAtIndex( - videoIndex - ); - this.setVideoLink(roomID, youtubeID); - - return youtubeID; - } else { - throw new Error('Room with this ID does not exist'); - } + async changeVideo(roomID: string, videoIndex: number): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + const playlistMutator = new Playlist(roomSnapshot.playlist); + const youtubeId = playlistMutator.getYoutubeIDAtIndex(videoIndex); + roomSnapshot.youtubeID = youtubeId; + roomStateClient.SET(roomID, JSON.stringify(roomSnapshot)); + return youtubeId; } - moveVideo(roomID: string, oldIndex: number, newIndex: number): void { - this.roomMap[roomID].playlist.moveVideoToIndex(oldIndex, newIndex); + async moveVideo( + roomID: string, + oldIndex: number, + newIndex: number + ): Promise { + const roomExists: string | null = await getRoomState(roomID); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + const playlistMutator = new Playlist(roomSnapshot.playlist); + playlistMutator.moveVideoToIndex(oldIndex, newIndex); + roomSnapshot.playlist = playlistMutator.getPlaylistIds(); + roomStateClient.SET(roomID, JSON.stringify(roomSnapshot)); } - getPlaylistVideoIds(roomId: string): string[] { - return this.roomMap[roomId].playlist.getPlaylistIds(); + async getPlaylistVideoIds(roomId: string): Promise { + const roomExists: string | null = await getRoomState(roomId); + if (!roomExists) throw new Error('Room with this ID does not exist'); + const roomSnapshot: Room = JSON.parse(roomExists); + return roomSnapshot.playlist; } } diff --git a/server/utils/attach-socket-events.ts b/server/utils/attach-socket-events.ts index 0db0126..96ad696 100644 --- a/server/utils/attach-socket-events.ts +++ b/server/utils/attach-socket-events.ts @@ -7,6 +7,8 @@ import { deletePlaylistItem, movePlaylistItem, } from './socket-notifier'; +import redisClients from './redis-clients'; +const { roomStateClient } = redisClients; /** * Attaches event listeners to socket instance @@ -31,8 +33,8 @@ async function attachSocketEvents(io: WebSocketServer) { if (canJoin) { socket.join(roomId); - Rooms.addRoom(roomId, youtubeID, roomType); - Rooms.addClient(roomId, clientId, clientName); + await Rooms.addRoom(roomId, youtubeID, roomType); + await Rooms.addClient(roomId, clientId, clientName); // TODO: not sure if this is listened to on client side socket.broadcast.to(roomId).emit( @@ -44,8 +46,14 @@ async function attachSocketEvents(io: WebSocketServer) { }) ); - io.to(roomId).emit('updateClientList', Rooms.getRoomClients(roomId)); - io.to(roomId).emit('updatePlaylist', Rooms.getPlaylistVideoIds(roomId)); + io.to(roomId).emit( + 'updateClientList', + await Rooms.getRoomClients(roomId) + ); + io.to(roomId).emit( + 'updatePlaylist', + await Rooms.getPlaylistVideoIds(roomId) + ); io.to(roomId).emit( 'notifyClient', createUserMessage(null, clientId, `${clientName} entered`) @@ -65,8 +73,8 @@ async function attachSocketEvents(io: WebSocketServer) { }); // -------------------------- YOUTUBE EVENTS -------------------------- - socket.on('videoStateChange', (data) => { - const client = Rooms.getClient(socket.id); + socket.on('videoStateChange', async (data) => { + const client = await Rooms.getClient(socket.id); let message = `${client.name} `; switch (data.type) { case 'PLAY_VIDEO': @@ -77,7 +85,7 @@ async function attachSocketEvents(io: WebSocketServer) { break; } - socket.broadcast.to(Rooms.getClientRoomId(client.id)).emit( + socket.broadcast.to(await Rooms.getClientRoomId(client.id)).emit( 'notifyClient', createClientNotifier('updateVideoState', { type: data.type, @@ -88,17 +96,17 @@ async function attachSocketEvents(io: WebSocketServer) { }, }) ); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(await Rooms.getClientRoomId(client.id)).emit( 'notifyClient', createUserMessage(null, client.id, message) ); }); // -------------------------- MESSAGING EVENTS -------------------------- - socket.on('newMessage', (message) => { - const client = Rooms.getClient(socket.id); + socket.on('newMessage', async (message) => { + const client = await Rooms.getClient(socket.id); if (client) { - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(await Rooms.getClientRoomId(client.id)).emit( 'notifyClient', createUserMessage(client.name, client.id, message) ); @@ -106,44 +114,44 @@ async function attachSocketEvents(io: WebSocketServer) { }); // -------------------------- PLAYLIST EVENTS -------------------------- - socket.on('addToPlaylist', (youtubeId) => { - const client = Rooms.getClient(socket.id); - const roomId = Rooms.getClientRoomId(client.id); + socket.on('addToPlaylist', async (youtubeId) => { + const client = await Rooms.getClient(socket.id); + const roomId = await Rooms.getClientRoomId(client.id); const message = `${client.name} added a new video`; - Rooms.addVideo(roomId, youtubeId); - const newPlaylist: string[] = Rooms.getPlaylistVideoIds(roomId); + await Rooms.addVideo(roomId, youtubeId); + const newPlaylist: string[] = await Rooms.getPlaylistVideoIds(roomId); if (client) { io.to(roomId).emit('notifyClient', createPlaylistItem(newPlaylist)); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); } }); - socket.on('deletePlaylistItem', (videoIndex: number) => { - const client = Rooms.getClient(socket.id); - const roomId = Rooms.getClientRoomId(client.id); + socket.on('deletePlaylistItem', async (videoIndex: number) => { + const client = await Rooms.getClient(socket.id); + const roomId = await Rooms.getClientRoomId(client.id); const message = `${client.name} deleted a playlist item`; Rooms.deleteVideo(roomId, videoIndex); - const newPlaylist: string[] = Rooms.getPlaylistVideoIds(roomId); + const newPlaylist: string[] = await Rooms.getPlaylistVideoIds(roomId); if (client) { io.to(roomId).emit('notifyClient', deletePlaylistItem(newPlaylist)); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); } }); - socket.on('changeVideo', (videoIndex: number) => { - const client = Rooms.getClient(socket.id); - const roomId = Rooms.getClientRoomId(client.id); - const youtubeID = Rooms.changeVideo(roomId, videoIndex); + socket.on('changeVideo', async (videoIndex: number) => { + const client = await Rooms.getClient(socket.id); + const roomId = await Rooms.getClientRoomId(client.id); + const youtubeID = await Rooms.changeVideo(roomId, videoIndex); const message = `${client.name} changed the video`; io.to(roomId).emit( @@ -152,24 +160,24 @@ async function attachSocketEvents(io: WebSocketServer) { youtubeID, }) ); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); }); - socket.on('insertVideoAtIndex', ({ oldIndex, newIndex }) => { - const client = Rooms.getClient(socket.id); - const roomId = Rooms.getClientRoomId(client.id); + socket.on('insertVideoAtIndex', async ({ oldIndex, newIndex }) => { + const client = await Rooms.getClient(socket.id); + const roomId = await Rooms.getClientRoomId(client.id); const message = `${client.name} swapped playlist item #${ oldIndex + 1 } with item #${newIndex + 1}`; - Rooms.moveVideo(roomId, oldIndex, newIndex); - const newPlaylist: string[] = Rooms.getPlaylistVideoIds(roomId); + await Rooms.moveVideo(roomId, oldIndex, newIndex); + const newPlaylist: string[] = await Rooms.getPlaylistVideoIds(roomId); io.to(roomId).emit('notifyClient', movePlaylistItem(newPlaylist)); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); @@ -182,7 +190,7 @@ async function attachSocketEvents(io: WebSocketServer) { { clientName, roomId }: { clientName: string; roomId: string }, admitClient: () => Promise ) => { - const room = Rooms.getRoom(roomId); + const room = await Rooms.getRoom(roomId); const roomType = room.roomType; if (roomType === 'private') { // Update waiting clients and list in the room @@ -190,7 +198,8 @@ async function attachSocketEvents(io: WebSocketServer) { ...room.waitingClients, [socket.id]: clientName, }; - Rooms.addToWaitingList(socket.id, roomId); + roomStateClient.SET(roomId, JSON.stringify(room)); + Rooms.mapClientIdToRoomId(socket.id, roomId); io.to(room.hostId).emit('waitingClient', { waitingClients: room.waitingClients, @@ -203,33 +212,33 @@ async function attachSocketEvents(io: WebSocketServer) { socket.on( 'waitingResponse', - ({ socketId, status }: { socketId: string; status: string }) => { - if (status === 'accept') Rooms.removeFromWaiting(socketId); + async ({ socketId, status }: { socketId: string; status: string }) => { + if (status === 'accept') await Rooms.removeFromWaiting(socketId); io.to(socketId).emit(status); } ); // -------------------------- OTHER EVENTS -------------------------- - socket.on('mute', ({ id }) => { - const client = Rooms.getClient(socket.id); - const roomId = Rooms.getClientRoomId(client.id); - const newClients = Rooms.updateMute(id, roomId); + socket.on('mute', async ({ id }) => { + const client = await Rooms.getClient(socket.id); + const roomId = await Rooms.getClientRoomId(client.id); + const newClients = await Rooms.updateMute(id, roomId); let message = `${client.name} `; client.isMuted ? (message += 'muted') : (message += 'unmuted'); io.to(roomId).emit('updateClientList', newClients); - io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); }); - socket.on('disconnect', () => { - if (!Rooms.isInWaitingList(socket.id)) { - const client = Rooms.getClient(socket.id); + socket.on('disconnect', async () => { + if (!(await Rooms.isInWaitingList(socket.id))) { + const client = await Rooms.getClient(socket.id); if (client === undefined) return; - const roomId = Rooms.getClientRoomId(client.id); - const room = Rooms.getRoom(roomId); + const roomId = await Rooms.getClientRoomId(client.id); + const room = await Rooms.getRoom(roomId); // Choose the next host if the socket that's disconnecting is the host if (socket.id === room.hostId) { @@ -243,21 +252,22 @@ async function attachSocketEvents(io: WebSocketServer) { clientIdLeft: '', }); } else { - Rooms.closeRoom(roomId); - return; + // i think the 1 client is not removed from client->roomId mapping here + return Rooms.closeRoom(roomId); } } - const newClientList = Rooms.removeClient(roomId, socket.id); + const newClientList = await Rooms.removeClient(roomId, socket.id); const message = `${client.name} left`; io.to(roomId).emit('updateClientList', newClientList); - io.to(Rooms.getClientRoomId(client.id)).emit( + // io.to(Rooms.getClientRoomId(client.id)).emit( + io.to(roomId).emit( 'notifyClient', createUserMessage(null, client.id, message) ); } else { - const roomId = Rooms.getWaitingClientRoomId(socket.id); - Rooms.removeFromWaiting(socket.id); - const room = Rooms.getRoom(roomId); + const roomId = await Rooms.getWaitingClientRoomId(socket.id); + await Rooms.removeFromWaiting(socket.id); + const room = await Rooms.getRoom(roomId); io.to(room.hostId).emit('updateWaitingClients', { waitingClientList: room.waitingClients, diff --git a/server/utils/redis-clients.ts b/server/utils/redis-clients.ts new file mode 100644 index 0000000..676c072 --- /dev/null +++ b/server/utils/redis-clients.ts @@ -0,0 +1,49 @@ +import { RedisClient } from 'redis'; +import { promisify } from 'util'; + +const adapterPubClient = new RedisClient({ + host: + process.env.NODE_ENV === 'production' + ? 'redisSocketIoAdapter' + : 'localhost', + port: 6379, +}); +const adapterSubClient = adapterPubClient.duplicate(); + +// roomId -> strinigified json blob of room state +const roomStateClient = new RedisClient({ + host: process.env.NODE_ENV === 'production' ? 'redisRoomState' : 'localhost', + port: 6380, +}); +const getRoomState = promisify(roomStateClient.get).bind(roomStateClient); + +// clientId -> roomId +const clientRoomIdClient = new RedisClient({ + host: + process.env.NODE_ENV === 'production' ? 'redisClientRoomId' : 'localhost', + port: 6381, +}); +const getClientRoomId = promisify(clientRoomIdClient.get).bind( + clientRoomIdClient +); + +// socketId/clientId -> roomId +const waitingRoomIdClient = new RedisClient({ + host: + process.env.NODE_ENV === 'production' ? 'redisWaitingRoomId' : 'localhost', + port: 6382, +}); +const getWaitingClientRoomId = promisify(waitingRoomIdClient.get).bind( + waitingRoomIdClient +); + +export default { + adapterPubClient, + adapterSubClient, + roomStateClient, + getRoomState, + clientRoomIdClient, + getClientRoomId, + waitingRoomIdClient, + getWaitingClientRoomId, +};