Skip to content

Commit

Permalink
feat: socket server improvement (#1021)
Browse files Browse the repository at this point in the history
feat: cleanup invalid files
  • Loading branch information
abretonc7s authored Sep 19, 2024
1 parent 0869b22 commit 1e41819
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 91 deletions.
4 changes: 2 additions & 2 deletions packages/sdk-socket-server-next/.env.sample
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
SEGMENT_API_KEY_PRODUCTION=123456789
SEGMENT_API_KEY_DEBUG=123456789
# REDIS_URL=redis://redis:6379/0
# Example REDIS_NODES format: "redis://host1:6379,redis://host2:6379"
REDIS_NODES=redis://localhost:6380,redis://localhost:6381,redis://localhost:6382
# OR on different ports: REDIS_NODES=redis://localhost:6380,redis://localhost:6381,redis://localhost:6382
REDIS_NODES=redis://localhost:6379
REDIS_PASSWORD=redis_password
REDIS_TLS=false
RATE_LIMITER=false
Expand Down
8 changes: 8 additions & 0 deletions packages/sdk-socket-server-next/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ This guide provides instructions for setting up and debugging the SDK socket ser
- Docker and Docker Compose installed (for Docker-based setup)
- Ngrok account and CLI tool installed (for external access testing)

## QuickStart

```bash
# start local redis server
docker compose up -d cache
yarn debug
```

## Local Setup

### Initial Configuration
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk-socket-server-next/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ services:
image: redis:7.2-alpine
command: redis-server --maxmemory 100mb --maxmemory-policy volatile-lru --loglevel debug
ports:
- "${DOCKER_ENV_LOCAL_REDIS_PORT}:6379"
- "${DOCKER_ENV_LOCAL_REDIS_PORT:-6379}:6379"

nginx:
image: nginx:latest
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk-socket-server-next/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
"build": "tsc",
"build:post-tsc": "echo 'N/A'",
"build:pre-tsc": "echo 'N/A'",
"typecheck": "tsc --noEmit",
"clean": "rimraf dist",
"docker:redis": "docker compose up redis-cluster-init",
"docker:redis:check": "yarn docker:redis && docker compose up check-redis",
"debug": "NODE_ENV=development nodemon --exec 'ts-node --transpile-only src/index.ts'",
"debug": "nodemon --exec 'NODE_ENV=development ts-node --transpile-only src/index.ts'",
"debug:redis": "cross-env NODE_ENV=development ts-node --transpile-only src/redis-check.ts",
"docker:debug": "yarn docker:redis && docker compose up appdev",
"lint": "yarn lint:eslint && yarn lint:misc --check",
Expand Down
4 changes: 3 additions & 1 deletion packages/sdk-socket-server-next/src/api-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import {
redisCluster,
redisTLS,
} from './config';
import { logger } from './logger';
import { getLogger } from './logger';

const logger = getLogger();

// Initialize Redis Cluster client
let redisNodes: {
Expand Down
8 changes: 7 additions & 1 deletion packages/sdk-socket-server-next/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { logger } from './logger';
import { createLogger, setLogger } from './logger';

export const isDevelopment: boolean = process.env.NODE_ENV === 'development';

// Initialize the logger
const logger = createLogger(isDevelopment);
setLogger(logger);

export const REDIS_DEBUG_LOGS: boolean =
process.env.REDIS_DEBUG_LOGS === 'true';
export const EVENTS_DEBUG_LOGS: boolean =
Expand All @@ -16,6 +21,7 @@ export const MAX_CLIENTS_PER_ROOM = 2;
export const config = {
msgExpiry: HOUR_IN_SECONDS,
channelExpiry: THIRTY_DAYS_IN_SECONDS,
rejectedChannelExpiry: 5 * 60, // 5min for dapp to fetch the message
};

if (process.env.CHANNEL_EXPIRY) {
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk-socket-server-next/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import dotenv from 'dotenv';
// Dotenv must be loaded before importing local files
dotenv.config();

// Load config
import { instrument } from '@socket.io/admin-ui';
import packageJson from '../package.json';
import { isDevelopment, withAdminUI } from './config';
import { analytics, app } from './api-config';
import { logger } from './logger';
import { getLogger } from './logger';
import { extractMetrics } from './metrics';
import { configureSocketServer } from './socket-config';
import { cleanupAndExit } from './utils';
import { isDevelopment, withAdminUI } from './config';

const server = http.createServer(app);
const logger = getLogger();

// Register event listeners for process termination events
process.on('SIGINT', async () => {
Expand Down
46 changes: 27 additions & 19 deletions packages/sdk-socket-server-next/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import winston, { format } from 'winston';
import { isDevelopment } from './config';

const customFormat = format.printf((ti) => {
const { level, message, timestamp } = ti;
Expand Down Expand Up @@ -32,27 +31,36 @@ const customFormat = format.printf((ti) => {
})
.join(' ') ?? '';

if (isDevelopment) {
const searchContext = message;
if (searchContext.indexOf('wallet') !== -1) {
msg += `\x1b[36m${message} ${extras}\x1b[0m`;
// eslint-disable-next-line no-negated-condition
} else if (searchContext.indexOf('dapp') !== -1) {
msg += `\x1b[35m${message} ${extras}\x1b[0m`;
} else {
msg += `${message} ${extras}`;
}
const searchContext = message;
if (searchContext.indexOf('wallet') !== -1) {
msg += `\x1b[36m${message} ${extras}\x1b[0m`;
// eslint-disable-next-line no-negated-condition
} else if (searchContext.indexOf('dapp') !== -1) {
msg += `\x1b[35m${message} ${extras}\x1b[0m`;
} else {
msg += `${message} ${extras}`;
}
return msg;
});

export const logger = winston.createLogger({
level: 'debug',
format: winston.format.combine(winston.format.timestamp(), customFormat),
transports: [
new winston.transports.Console(),
// You can also add file transport or any other transport here
],
});
// Create a function to initialize the logger
export function createLogger(isDevelopment: boolean) {
return winston.createLogger({
level: isDevelopment ? 'debug' : 'info',
format: winston.format.combine(winston.format.timestamp(), customFormat),
transports: [
new winston.transports.Console(),
// You can also add file transport or any other transport here
],
});
}

let _logger: winston.Logger;

export function getLogger(): winston.Logger {
return _logger;
}

export function setLogger(newLogger: winston.Logger): void {
_logger = newLogger;
}
33 changes: 27 additions & 6 deletions packages/sdk-socket-server-next/src/protocol/handleAck.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Server, Socket } from 'socket.io';
import { pubClient } from '../api-config';
import { logger } from '../logger';
import { getLogger } from '../logger';
import { ClientType } from '../socket-config';
import { QueuedMessage } from './handleMessage';

const logger = getLogger();

export type ACKParams = {
io: Server;
socket: Socket;
Expand All @@ -16,15 +18,19 @@ export type ACKParams = {
export const handleAck = async ({
channelId,
ackId,
socket,
clientType,
}: ACKParams) => {
const queueKey = `queue:${channelId}:${clientType}`;
let messages: any[] = [];

const socketId = socket.id;
const clientIp = socket.request.socket.remoteAddress;
try {
// Retrieve all messages to find and remove the specified one
messages = await pubClient.lrange(queueKey, 0, -1);
logger.debug(
`handleAck channelId=${channelId} -- Messages in ${clientType} queue: ${messages.length}`,
`[handleAck] channelId=${channelId} -- Messages in ${clientType} queue: ${messages.length}`,
messages,
);
const index = messages.findIndex((msg) => {
Expand All @@ -34,7 +40,7 @@ export const handleAck = async ({
return parsed.ackId === ackId;
} catch (e) {
logger.error(
`handleAck channelId=${channelId} -- Error parsing message`,
`[handleAck] channelId=${channelId} -- Error parsing message`,
msg,
e,
);
Expand All @@ -43,19 +49,34 @@ export const handleAck = async ({
});
if (index === -1) {
logger.warn(
`handleAck channelId=${channelId} -- Message ${ackId} not found in ${clientType} queue.`,
`[handleAck] channelId=${channelId} -- Message ${ackId} not found in ${clientType} queue.`,
{
channelId,
socketId,
clientIp,
},
);
} else {
const placeholder = `TO_REMOVE_${new Date().getTime()}`; // Unique placeholder
await pubClient.lset(queueKey, index, placeholder); // Set the message at index to unique placeholder
await pubClient.lrem(queueKey, 1, placeholder); // Remove the unique placeholder
logger.info(
`handleAck channelId=${channelId} -- Message ${ackId} removed from ${clientType} queue.`,
`[handleAck] channelId=${channelId} -- Message ${ackId} removed from ${clientType} queue.`,
{
channelId,
socketId,
clientIp,
},
);
}
} catch (error) {
logger.error(
`handleAck channelId=${channelId} -- Error removing message: ${error}`,
`[handleAck] channelId=${channelId} -- Error removing message: ${error}`,
{
channelId,
socketId,
clientIp,
},
);
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Server, Socket } from 'socket.io';
import { pubClient } from '../api-config';
import { config } from '../config';
import { getLogger } from '../logger';
import { ChannelConfig } from './handleJoinChannel';

const logger = getLogger();

export type ChannelRejectedParams = {
io: Server;
socket: Socket;
channelId: string;
};

/**
* Can only be called by the wallet after a connection attempt.
*
* @param params
* @param callback
*/
export const handleChannelRejected = async (
params: ChannelRejectedParams,
callback?: (error: string | null, result?: unknown) => void,
) => {
const { channelId, socket } = params;

const socketId = socket.id;
const clientIp = socket.request.socket.remoteAddress;

const channelConfigKey = `channel_config:${channelId}`;
const existingConfig = await pubClient.get(channelConfigKey);
let channelConfig: ChannelConfig | null = existingConfig
? (JSON.parse(existingConfig) as ChannelConfig)
: null;

if (channelConfig) {
logger.debug(
`[handleChannelRejected] Channel already exists: ${channelId}`,
JSON.stringify(channelConfig),
);

// ignore if already ready
if (channelConfig.ready) {
logger.warn(
`[handleChannelRejected] received rejected for channel that is already ready: ${channelId}`,
{
channelId,
socketId,
clientIp,
},
);
return;
}

// channel config already exists but keyexchange hasn't happened, so we can just update the existing one as rejected with short ttl.
channelConfig.rejected = true;
channelConfig.updatedAt = Date.now();
} else {
// this condition can occur if the dapp (ios) was disconnected before the channel config was created
channelConfig = {
clients: {
wallet: socketId,
dapp: '',
},
rejected: true,
createdAt: Date.now(),
updatedAt: Date.now(),
};
}

logger.info(
`[handleChannelRejected] updating channel config for channelId=${channelId}`,
{
channelId,
socketId,
clientIp,
},
);

// Update redis channel config to inform dApp of rejection
await pubClient.setex(
channelConfigKey,
config.rejectedChannelExpiry,
JSON.stringify(channelConfig),
);

// Also broadcast to dapp if it is connected
socket.broadcast.to(channelId).emit(`rejected-${channelId}`, { channelId });

// Edit redis channel config to set to terminated for sdk to pick up
callback?.(null, { success: true });
};
12 changes: 7 additions & 5 deletions packages/sdk-socket-server-next/src/protocol/handleCheckRoom.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { validate } from 'uuid';
import { Server, Socket } from 'socket.io';
import { logger } from '../logger';
import { getLogger } from '../logger';
import { pubClient } from '../api-config';

const logger = getLogger();

export type CheckRoomParams = {
channelId: string;
socket: Socket;
Expand All @@ -23,8 +25,8 @@ export const handleCheckRoom = async ({
const clientIp = socket.request.socket.remoteAddress;

if (!validate(channelId)) {
logger.info(`check_room ${channelId} invalid`, {
id: channelId,
logger.info(`[check_room] ${channelId} invalid`, {
channelId,
clientIp,
socketId,
});
Expand All @@ -37,8 +39,8 @@ export const handleCheckRoom = async ({
(await pubClient.hget('channels', channelId)) ?? undefined;

logger.info(
`check_room occupancy=${occupancy}, channelOccupancy=${channelOccupancy}`,
{ socketId, clientIp, id: channelId },
`[check_room] occupancy=${occupancy}, channelOccupancy=${channelOccupancy}`,
{ socketId, clientIp, channelId },
);
// Callback with null as the first argument, meaning "no error"
return callback(null, { occupancy, channelOccupancy });
Expand Down
Loading

0 comments on commit 1e41819

Please sign in to comment.