diff --git a/packages/sharder/CHANGELOG.md b/packages/sharder/CHANGELOG.md new file mode 100644 index 0000000..e4d87c4 --- /dev/null +++ b/packages/sharder/CHANGELOG.md @@ -0,0 +1,4 @@ +# Change Log + +All notable changes to this project will be documented in this file. +See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. diff --git a/packages/sharder/README.md b/packages/sharder/README.md new file mode 100644 index 0000000..665fc56 --- /dev/null +++ b/packages/sharder/README.md @@ -0,0 +1,3 @@ +# `@discordjs/sharder` + +> The sharding manager module for Discord.js diff --git a/packages/sharder/__tests__/.gitkeep b/packages/sharder/__tests__/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/packages/sharder/package.json b/packages/sharder/package.json new file mode 100644 index 0000000..820a4e5 --- /dev/null +++ b/packages/sharder/package.json @@ -0,0 +1,60 @@ +{ + "name": "@discordjs/sharder", + "version": "0.1.1-canary.0", + "description": "The sharding manager for discord.js", + "private": true, + "scripts": { + "test": "echo \"Error: run tests from root\" && exit 1", + "build": "tsup && tsc --emitDeclarationOnly" + }, + "main": "./dist/index.js", + "module": "./dist/index.mjs", + "typings": "./dist/index.d.ts", + "exports": { + "import": "./dist/index.mjs", + "require": "./dist/index.js" + }, + "directories": { + "lib": "src", + "test": "__tests__" + }, + "files": [ + "dist" + ], + "contributors": [ + "Crawl ", + "Amish Shah ", + "SpaceEEC ", + "Vlad Frangu ", + "Antonio Roman " + ], + "license": "Apache-2.0", + "keywords": [ + "discord", + "api", + "sharder", + "shardmanager", + "discordapp" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/discordjs/discord.js-modules.git" + }, + "bugs": { + "url": "https://github.com/discordjs/discord.js-modules/issues" + }, + "homepage": "https://github.com/discordjs/discord.js-modules/tree/main/packages/sharder", + "dependencies": { + "@discordjs/rest": "^0.1.1-canary.0", + "@sapphire/utilities": "^3.1.0", + "discord-api-types": "^0.25.2", + "tslib": "^2.3.1", + "zod": "^3.11.6" + }, + "engines": { + "node": ">=16.0.0" + }, + "publishConfig": { + "access": "public" + } +} diff --git a/packages/sharder/src/ShardingManager.ts b/packages/sharder/src/ShardingManager.ts new file mode 100644 index 0000000..7d8c47f --- /dev/null +++ b/packages/sharder/src/ShardingManager.ts @@ -0,0 +1,315 @@ +import { REST } from '@discordjs/rest'; +import { chunk } from '@sapphire/utilities'; +import { EventEmitter } from 'node:events'; +import { cpus } from 'node:os'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { z } from 'zod'; +import type { IMessageHandlerConstructor } from './messages/base/IMessageHandler'; +import { JsonMessageHandler } from './messages/JsonMessageHandler'; +import type { BaseProcessClusterHandlerOptions } from './server/clusters/process/BaseProcessClusterHandler'; +import { ForkProcessClusterHandler } from './server/clusters/process/ForkProcessClusterHandler'; +import type { + IClusterHandler, + IClusterHandlerConstructor, + ClusterHandlerSendOptions, +} from './server/clusters/base/IClusterHandler'; +import type { ShardPingOptions } from './server/ShardPing'; +import { + fetchRecommendedShards, + FetchRecommendedShardsOptions, + fetchRecommendedShardsOptionsPredicate, +} from './server/utils/utils'; +import type { NonNullObject } from './utils/types'; + +const shardingManagerOptionsPredicate = z.strictObject({ + shardList: z.literal('auto').or(z.array(z.number().positive().int()).nonempty()).default('auto'), + totalShards: z.literal('auto').or(z.number().int().gte(1)).default('auto'), + clusterCount: z + .number() + .int() + .gte(1) + .default(() => cpus().length), + token: z.string().nonempty().nullish().default(null), + rest: z.instanceof(REST).optional(), + respawns: z.number().int().positive().or(z.literal(-1)).or(z.literal(Infinity)).default(-1), + shardOptions: z.object({}).default({}), + pingOptions: z.strictObject({ + delay: z.number().int().gte(1).or(z.literal(-1)).or(z.literal(Infinity)).default(45_000), + delaySinceReceived: z.boolean().default(false), + }), + ClusterHandler: z.object({ spawn: z.function(), validate: z.function(), isPrimary: z.boolean() }).optional(), + MessageHandler: z.object({ build: z.function() }).optional(), +}); + +const shardingManagerSpawnOptions = z + .strictObject({ + amount: z.literal('auto').or(z.number().int().gte(1)).optional(), + delay: z.number().int().positive().default(5500), + timeout: z.number().int().positive().default(30_000), + }) + .merge(fetchRecommendedShardsOptionsPredicate); + +const shardingManagerRespawnAllOptions = z.strictObject({ + shardDelay: z.number().int().positive().default(5000), + respawnDelay: z.number().int().positive().default(500), + timeout: z.number().int().positive().or(z.literal(-1)).or(z.literal(Infinity)).default(30000), +}); + +/** + * The ShardingManager is an utility class that makes multi-thread sharding of a bot a much simpler experience. + * + * It works by spawning shards that will create a channel to a different thread, process, or server, defined by the + * implementation of the defined {@link IClusterHandler}, and sends messages between the primary process (the one that + * spawns the shards) with the shards using a message queue and message format as defined by {@link IMessageHandler}. + * + * Furthermore, this utility has several useful methods that allow managing the lifetimes of the shards as well as + * sending messages to one or all of them. + */ +export class ShardingManager extends EventEmitter { + /** + * Number of total shards of all shard managers or "auto". + */ + public shardList: [number, ...number[]] | 'auto'; + + /** + * List of shards to spawn or "auto". + */ + public totalShards: number | 'auto'; + + /** + * The number of clusters to create. + */ + public clusterCount: number; + + /** + * Whether or not the shards should respawn. + */ + public readonly respawns: number; + + /** + * The REST handler. + */ + public readonly rest: REST; + + /** + * The shard options validated by {@link IClusterHandlerConstructor.validate}. + */ + public readonly options: ShardOptions; + + public readonly pingOptions: Required; + + /** + * The {@link IClusterHandler} constructor. + */ + public readonly ClusterHandler: IClusterHandlerConstructor; + + /** + * The {@link IMessageHandler} constructor. + */ + public readonly MessageHandler: IMessageHandlerConstructor; + + /** + * Token to use for automatic shard count and passing to shards. + */ + public readonly token: string | null; + + public readonly shards: IClusterHandler[] = []; + + public constructor(options: ShardingManagerOptions = {}) { + super(); + + const resolved = shardingManagerOptionsPredicate.parse(options); + this.shardList = resolved.shardList; + this.totalShards = resolved.totalShards; + this.clusterCount = resolved.clusterCount; + this.respawns = resolved.respawns; + this.token = resolved.token?.replace(/^Bot\s*/i, '') ?? null; + this.rest = resolved.rest ?? new REST().setToken(this.token!); + + this.ClusterHandler = options.ClusterHandler ?? (ForkProcessClusterHandler as any); + this.options = this.ClusterHandler.validate(options.shardOptions); + this.pingOptions = resolved.pingOptions; + this.ClusterHandler.setup(this.options); + + this.MessageHandler = options.MessageHandler ?? JsonMessageHandler; + } + + /** + * Whether or not the process is a primary one. + */ + public get isPrimary() { + return this.ClusterHandler.isPrimary; + } + + /** + * Spawns all shards given the options and that the process is not primary. + * @param options The spawn options. + * @returns Whether or not the spawn has happened. Will always be the opposite of {@link IClusterHandlerConstructor.isPrimary}. + */ + public async spawn(options: ShardingManagerSpawnOptions): Promise { + if (this.isPrimary) return false; + + const resolved = shardingManagerSpawnOptions.parse(options); + let amount = resolved.amount ?? this.totalShards; + + if (amount === 'auto') { + amount = await fetchRecommendedShards(this.rest, resolved); + } + + if (this.shards.length >= amount) throw new Error('Shards have already spawned.'); + if (this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) { + this.shardList = [...Array(amount).keys()] as [number, ...number[]]; + } + + if (this.totalShards === 'auto' || this.totalShards !== amount) { + this.totalShards = amount; + } + + const shards = z.number().int().gte(0).lt(amount).array().parse(this.shardList); + const clusterIds = chunk(shards, Math.ceil(shards.length / this.clusterCount)); + + // Spawn the shards + for (const shardIds of clusterIds) { + await Promise.all([ + this.spawnShard(shardIds), + resolved.delay > 0 && this.shards.length !== clusterIds.length ? sleep(resolved.delay) : Promise.resolve(), + ]); + } + + return true; + } + + public async spawnShard(shardIds: readonly number[]) { + const shard = new this.ClusterHandler(shardIds, this, this.MessageHandler); + await shard.start({ timeout: 0 }); + + this.shards.push(shard); + + this.emit('shardCreate', shard); + return shard; + } + + /** + * Kills all running shards and respawns them. + * @param options The options for respawning shards. + */ + public async respawnAll(options: ShardingManagerRespawnAllOptions) { + const resolved = shardingManagerRespawnAllOptions.parse(options); + + let s = 0; + for (const shard of this.shards) { + await Promise.all([ + shard.restart({ delay: resolved.respawnDelay, timeout: resolved.timeout }), + ++s < this.shards.length && resolved.shardDelay > 0 ? sleep(resolved.shardDelay) : Promise.resolve(), + ]); + } + } + + /** + * Sends a message to all shards. + * @param data The data to be sent to the shards. + * @param options The options to be passed to each {@link IClusterHandler.send}. + * @returns An array of the resolved values from {@link IClusterHandler.send}. + */ + public broadcast(data: unknown, options: ClusterHandlerSendOptions): Promise { + const promises = []; + for (const shard of this.shards.values()) promises.push(shard.send(data, options)); + return Promise.all(promises); + } +} + +export interface ShardingManagerOptions { + /** + * Number of total shards of all shard managers or "auto". + * @default 'auto' + */ + totalShards?: number | 'auto'; + + /** + * List of shards to spawn or "auto". + * @default 'auto' + */ + shardList?: [number, ...number[]] | 'auto'; + + /** + * The number of clusters to create, defaults to the number of cores. + * @default + */ + clusters?: number; + + /** + * The amount of times to respawn shards (`-1` or `Infinity` for no limitless) + * @default -1 + */ + respawns?: number; + + /** + * The {@link IClusterHandler} builder. + * @default ForkProcessClusterHandler + */ + ClusterHandler?: IClusterHandlerConstructor; + + /** + * The shard options. + * @default {} + */ + shardOptions?: ShardOptions; + + /** + * The options for the shard pinging. + * @default {} + */ + pingOptions?: ShardPingOptions; + + /** + * The {@link IMessageHandler} builder. + * @default JsonMessageHandler + */ + MessageHandler?: IMessageHandlerConstructor; + + /** + * Token to use for automatic shard count and passing to shards. + * @default process.env.DISCORD_TOKEN + */ + token?: string; +} + +export interface ShardingManagerSpawnOptions extends FetchRecommendedShardsOptions { + /** + * Number of shards to spawn. + * @default this.totalShards + */ + amount?: number | 'auto'; + + /** + * How long to wait in between spawning each shard, in milliseconds. + * @default 5500 + */ + delay?: number; + + /** + * The amount in milliseconds to wait until the shard has become ready. + * @default 30000 + */ + timeout?: number; +} + +export interface ShardingManagerRespawnAllOptions { + /** + * How long to wait between shards, in milliseconds. + * @default 5000 + */ + shardDelay?: number; + + /** + * How long to wait between killing a shard's process and restarting it, in milliseconds. + * @default 500 + */ + respawnDelay?: number; + + /** + * The amount in milliseconds to wait for a shard to become ready before, continuing to another (`-1` or `Infinity` for no wait). + * @default 30000 + */ + timeout?: number; +} diff --git a/packages/sharder/src/index.ts b/packages/sharder/src/index.ts new file mode 100644 index 0000000..9bd6328 --- /dev/null +++ b/packages/sharder/src/index.ts @@ -0,0 +1,5 @@ +export * from './messages'; +export * from './server/clusters'; +export * from './server/ShardPing'; +export * from './ShardingManager'; +export * from './utils/types'; diff --git a/packages/sharder/src/messages/BinaryMessageHandler.ts b/packages/sharder/src/messages/BinaryMessageHandler.ts new file mode 100644 index 0000000..2971a1b --- /dev/null +++ b/packages/sharder/src/messages/BinaryMessageHandler.ts @@ -0,0 +1,18 @@ +import { BaseMessageHandler } from './base/BaseMessageHandler'; +import { DeserializedData, MessageOp, SerializedData } from './base/IMessageHandler'; +import { serialize, deserialize } from 'node:v8'; + +export class BinaryMessageHandler extends BaseMessageHandler { + public get name(): string { + return 'binary'; + } + + public serialize(data: unknown, op: MessageOp = MessageOp.Message, id?: number): SerializedData { + id ??= this.nextId; + return { id, body: serialize({ id, op, data }) }; + } + + public deserialize(data: Buffer): DeserializedData { + return deserialize(data); + } +} diff --git a/packages/sharder/src/messages/JsonMessageHandler.ts b/packages/sharder/src/messages/JsonMessageHandler.ts new file mode 100644 index 0000000..b47cc05 --- /dev/null +++ b/packages/sharder/src/messages/JsonMessageHandler.ts @@ -0,0 +1,17 @@ +import { BaseMessageHandler } from './base/BaseMessageHandler'; +import { DeserializedData, MessageOp, SerializedData } from './base/IMessageHandler'; + +export class JsonMessageHandler extends BaseMessageHandler { + public get name(): string { + return 'json'; + } + + public serialize(data: unknown, op: MessageOp = MessageOp.Message, id?: number): SerializedData { + id ??= this.nextId; + return { id, body: JSON.stringify({ id, op, data }) }; + } + + public deserialize(data: string): DeserializedData { + return JSON.parse(data); + } +} diff --git a/packages/sharder/src/messages/RawMessageHandler.ts b/packages/sharder/src/messages/RawMessageHandler.ts new file mode 100644 index 0000000..5ff0fd5 --- /dev/null +++ b/packages/sharder/src/messages/RawMessageHandler.ts @@ -0,0 +1,17 @@ +import { BaseMessageHandler } from './base/BaseMessageHandler'; +import { DeserializedData, MessageOp, SerializedData } from './base/IMessageHandler'; + +export class RawMessageHandler extends BaseMessageHandler { + public get name(): string { + return 'raw'; + } + + public serialize(data: unknown, op: MessageOp = MessageOp.Message, id?: number): SerializedData { + id ??= this.nextId; + return { id, body: { id, op, data } }; + } + + public deserialize(data: unknown): DeserializedData { + return data as DeserializedData; + } +} diff --git a/packages/sharder/src/messages/base/BaseMessageHandler.ts b/packages/sharder/src/messages/base/BaseMessageHandler.ts new file mode 100644 index 0000000..dd8b5a5 --- /dev/null +++ b/packages/sharder/src/messages/base/BaseMessageHandler.ts @@ -0,0 +1,53 @@ +import { createDeferredPromise, DeferredPromise } from '../../server/utils/utils'; +import type { DeserializedData, IMessageHandler, MessageOp, SerializedData } from './IMessageHandler'; + +export abstract class BaseMessageHandler implements IMessageHandler { + private lastId = 0; + private readonly tracked = new Map(); + + public abstract serialize(data: unknown, op?: MessageOp, id?: number): SerializedData; + public abstract deserialize(data: SerializedType): DeserializedData; + + public abstract get name(): string; + + protected get nextId() { + return this.lastId++; + } + + public handle(id: number, data: unknown): void { + const deferred = this.tracked.get(id); + if (!deferred) return; + + this.tracked.delete(id); + deferred.resolve(data); + } + + public track(id: number): void { + if (this.tracked.has(id)) throw new Error(`Duplicate message id: ${id}`); + this.tracked.set(id, createDeferredPromise()); + } + + public untrack(id: number): void { + const deferred = this.tracked.get(id); + if (!deferred) return; + + this.tracked.delete(id); + deferred.reject(new Error('Aborted tracking')); + } + + public clear(): void { + for (const deferred of this.tracked.values()) { + deferred.reject(new Error('Aborted tracking')); + } + + this.tracked.clear(); + } + + public waitForId(id: number): Promise { + return this.tracked.get(id)?.promise ?? Promise.reject(new Error(`The message id ${id} is not being tracked`)); + } + + public static build(): IMessageHandler { + return Reflect.construct(this, []); + } +} diff --git a/packages/sharder/src/messages/base/IMessageHandler.ts b/packages/sharder/src/messages/base/IMessageHandler.ts new file mode 100644 index 0000000..8b8ed34 --- /dev/null +++ b/packages/sharder/src/messages/base/IMessageHandler.ts @@ -0,0 +1,40 @@ +export interface IMessageHandler { + get name(): string; + + serialize(data: unknown, op?: MessageOp, id?: number): SerializedData; + deserialize(data: SerializedType): DeserializedData; + + handle(id: number, data: unknown): void; + + track(id: number): void; + untrack(id: number): void; + clear(): void; + + waitForId(id: number): Promise; +} + +export const enum MessageOp { + Ping, + Ready, + Disconnected, + Reconnecting, + RespawnAll, + Message, +} + +export interface SerializedData { + id: number; + body: SerializedType; +} + +export interface DeserializedData { + id: number; + op: MessageOp; + data: unknown; +} + +export type IMessageHandlerConstructor = new () => IMessageHandler; + +// export interface IMessageHandlerConstructor { +// build(): IMessageHandler; +// } diff --git a/packages/sharder/src/messages/index.ts b/packages/sharder/src/messages/index.ts new file mode 100644 index 0000000..6dd2dd7 --- /dev/null +++ b/packages/sharder/src/messages/index.ts @@ -0,0 +1,22 @@ +import type { IMessageHandlerConstructor } from './base/IMessageHandler'; +import { BinaryMessageHandler } from './BinaryMessageHandler'; +import { JsonMessageHandler } from './JsonMessageHandler'; +import { RawMessageHandler } from './RawMessageHandler'; + +export * from './base/BaseMessageHandler'; +export * from './base/IMessageHandler'; +export * from './BinaryMessageHandler'; +export * from './JsonMessageHandler'; +export * from './RawMessageHandler'; + +export const messageHandlers = new Map>([ + ['binary', BinaryMessageHandler], + ['json', JsonMessageHandler], + ['raw', RawMessageHandler], +]); + +export interface MessageHandlers { + binary: BinaryMessageHandler; + json: JsonMessageHandler; + raw: RawMessageHandler; +} diff --git a/packages/sharder/src/server/ShardPing.ts b/packages/sharder/src/server/ShardPing.ts new file mode 100644 index 0000000..4bc7ec6 --- /dev/null +++ b/packages/sharder/src/server/ShardPing.ts @@ -0,0 +1,87 @@ +import { MessageOp } from '../messages/base/IMessageHandler'; +import type { IClusterHandler } from './clusters/base/IClusterHandler'; +import type { NonNullObject } from '../utils/types'; + +export class ShardPing { + public lastSentTimestamp = -1; + public lastReceivedTimestamp = -1; + public lastLatency = -1; + public readonly shard: IClusterHandler; + + private timeout: NodeJS.Timer | null = null; + + public constructor(shard: IClusterHandler) { + this.shard = shard; + } + + public start() { + if (this.timeout !== null) return this; + + const { delay } = this.options; + if (delay < 0 || delay === Infinity) return this.stop(); + + this.timeout = setTimeout(() => void this.send(), delay).unref(); + + return this; + } + + public stop() { + if (this.timeout === null) return this; + + clearTimeout(this.timeout); + this.timeout = null; + + return this; + } + + public async send() { + this.lastSentTimestamp = Date.now(); + await this.shard.send(this.lastSentTimestamp, { opcode: MessageOp.Ping }); + + // If the timeout is to be refreshed on send, refresh: + if (!this.options.delaySinceReceived) this.timeout?.refresh(); + } + + public receive(timestamp: number) { + this.lastReceivedTimestamp = Date.now(); + this.lastLatency = this.lastReceivedTimestamp - timestamp; + + // If the timeout is to be refreshed on receive, refresh: + if (this.options.delaySinceReceived) this.timeout?.refresh(); + } + + public get options() { + return this.shard.manager.pingOptions; + } + + public get hasReceivedResponse() { + return this.lastReceivedTimestamp >= this.lastSentTimestamp; + } + + public get lastSentAt(): Date { + return new Date(this.lastSentTimestamp); + } + + public get lastReceivedAt(): Date { + return new Date(this.lastReceivedTimestamp); + } + + public get nextPingIn() { + const { delay, delaySinceReceived } = this.options; + return delay + (delaySinceReceived ? this.lastReceivedTimestamp : this.lastSentTimestamp); + } +} + +export interface ShardPingOptions { + /** + * The delay in milliseconds between pings. + * @default 45_000 + */ + delay?: number; + + /** + * Whether the next ping should happen after {@link ShardPing.lastReceivedTimestamp} or after {@link ShardPing.lastSentTimestamp}. + * @default false + */ + delaySinceReceived?: boolean; +} diff --git a/packages/sharder/src/server/clusters/base/BaseClusterHandler.ts b/packages/sharder/src/server/clusters/base/BaseClusterHandler.ts new file mode 100644 index 0000000..5026d4a --- /dev/null +++ b/packages/sharder/src/server/clusters/base/BaseClusterHandler.ts @@ -0,0 +1,151 @@ +import { EventEmitter } from 'node:events'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { IMessageHandler, IMessageHandlerConstructor, MessageOp } from '../../../messages/base/IMessageHandler'; +import type { ShardingManager, ShardingManagerRespawnAllOptions } from '../../../ShardingManager'; +import { ShardPing } from '../../ShardPing'; +import type { NonNullObject } from '../../../utils/types'; +import type { + IClusterHandler, + ClusterHandlerRestartOptions, + ClusterHandlerSendOptions, + ClusterHandlerStartOptions, +} from './IClusterHandler'; + +export abstract class BaseClusterHandler + extends EventEmitter + implements IClusterHandler +{ + public readonly ids: readonly number[]; + + public readonly manager: ShardingManager; + + public readonly ping: ShardPing; + + /** + * Whether or not the shard's client is ready. + */ + public ready = false; + + protected respawnsLeft: number; + + protected readonly messages: IMessageHandler; + + public constructor( + ids: readonly number[], + manager: ShardingManager, + MessageHandler: IMessageHandlerConstructor, + ) { + super(); + this.ids = ids; + this.manager = manager; + this.respawnsLeft = this.manager.respawns; + this.ping = new ShardPing(this); + this.messages = new MessageHandler(); + } + + public async send(data: unknown, options: ClusterHandlerSendOptions = {}): Promise { + const serialized = this.messages.serialize(data, options.opcode ?? MessageOp.Message); + const reply = options.reply ?? true; + + if (reply) this.messages.track(serialized.id); + + try { + await this.sendMessage(serialized.body); + } catch (error) { + this.messages.untrack(serialized.id); + throw error; + } + + return reply ? this.messages.waitForId(serialized.id) : null; + } + + /** + * Closes and restarts the shard. + * @param options The options for respawning the shard. + */ + public async restart(options: ClusterHandlerRestartOptions): Promise { + await this.close(); + if (options.delay > 0) await sleep(options.delay); + await this.start(options); + } + + public abstract start(options: ClusterHandlerStartOptions): Promise | void; + + /** + * Closes the shard and does not restart it. + */ + public abstract close(): Promise | void; + + protected abstract sendMessage(data: unknown): Promise; + + protected _consumeRespawn() { + if (this.respawnsLeft === 0) return false; + + if (this.respawnsLeft !== -1 && this.respawnsLeft !== Infinity) --this.respawnsLeft; + return true; + } + + protected _handleMessage(message: string) { + const deserialized = this.messages.deserialize(message); + this.messages.handle(deserialized.id, deserialized.data); + + switch (deserialized.op) { + case MessageOp.Ping: { + this.ping.receive(deserialized.data as number); + break; + } + case MessageOp.Ready: { + this.ready = true; + this.respawnsLeft = this.manager.respawns; + this.manager.emit('shardReady', this); + this.emit('ready'); + break; + } + case MessageOp.Disconnected: { + this.ready = false; + this.manager.emit('shardDisconnected', this); + this.emit('disconnected'); + break; + } + case MessageOp.Reconnecting: { + this.ready = false; + this.manager.emit('shardReconnecting', this); + this.emit('reconnecting'); + break; + } + case MessageOp.RespawnAll: { + this.manager + .respawnAll(deserialized.data as ShardingManagerRespawnAllOptions) + .catch((error) => this.send({ success: false, error }, { id: deserialized.id, reply: false })) + .catch(() => void 0); + break; + } + case MessageOp.Message: { + this.manager.emit('shardMessage', this); + this.emit('message'); + break; + } + } + } + + protected _handleStart() { + this.ping.start(); + } + + protected _handleStop() { + this.ping.stop(); + } + + public static setup(options: NonNullObject): void; + public static setup() { + // NOP + } + + public static validate(value: unknown): NonNullObject { + return value as NonNullObject; + } + + public static get isPrimary() { + return false; + } +} diff --git a/packages/sharder/src/server/clusters/base/IClusterHandler.ts b/packages/sharder/src/server/clusters/base/IClusterHandler.ts new file mode 100644 index 0000000..a4222de --- /dev/null +++ b/packages/sharder/src/server/clusters/base/IClusterHandler.ts @@ -0,0 +1,96 @@ +import type { IMessageHandlerConstructor, MessageOp } from '../../../messages/base/IMessageHandler'; +import type { ShardingManager } from '../../../ShardingManager'; +import type { NonNullObject } from '../../../utils/types'; + +/** + * The shard handler is a strategy system that manages the lifetime and a channel to the client shard, this class is + * used exclusively in the primary process alongside {@link ShardingManager}, and can be passed in + * {@link ShardingManagerOptions.ClusterHandler}. + * + * To create your own strategy, the easiest way is to create a class extending any of the following bases: + * + * - {@link BaseClusterHandler}: defines the bare-basic implementation. + * - {@link BaseProcessClusterHandler}: defines an almost-full implementation, works with {@link ChildProcess} and + * {@link Worker}. + * + * Furthermore, the library ships the following built-in handlers: + * + * - {@link ForkProcessClusterHandler}: defines a process-based sharding using `child_process.fork`. + * - {@link ClusterProcessClusterHandler}: defines a process-based sharding using `cluster.fork`. + */ +export interface IClusterHandler { + /** + * The shard IDs. + */ + readonly ids: readonly number[]; + + /** + * The manager that instantiated the shard handler. + */ + readonly manager: ShardingManager; + + /** + * Sends data to the shard. + * @param data The data to be sent. + * @param options The options for the message delivery. + */ + send(data: unknown, options?: ClusterHandlerSendOptions): Promise; + + /** + * Starts the shard. + * @param options The options defining the start-up behavior. + */ + start(options: ClusterHandlerStartOptions): Promise | void; + + /** + * Closes the shard and terminates the communication with the client. + */ + close(): Promise | void; + + /** + * Restarts the shard handler, may call {@link start} and then {@link close}. + * @param options The options defining the respawn behavior. + */ + restart(options: ClusterHandlerRestartOptions): Promise; +} + +export interface ClusterHandlerSendOptions { + id?: number; + reply?: boolean; + opcode?: MessageOp; +} + +export interface ClusterHandlerStartOptions { + timeout?: number | undefined; +} + +export interface ClusterHandlerRestartOptions { + delay: number; + timeout: number; +} + +export interface IClusterHandlerConstructor { + new ( + ids: readonly number[], + manager: ShardingManager, + messageBuilder: IMessageHandlerConstructor, + ): IClusterHandler; + + /** + * Sets up the shard handler for subsequent runs. + * @param options The options passed in {@link ShardingManagerOptions.shardOptions}. + */ + setup(options: ResolvedOptions): void; + + /** + * Validates the shard options. + * @param value The options passed in {@link ShardingManagerOptions.shardOptions}. + * @returns The validated values with the defined values. + */ + validate(value: unknown): ResolvedOptions; + + /** + * Whether or not the process is a primary one. + */ + readonly isPrimary: boolean; +} diff --git a/packages/sharder/src/server/clusters/index.ts b/packages/sharder/src/server/clusters/index.ts new file mode 100644 index 0000000..bb2b18b --- /dev/null +++ b/packages/sharder/src/server/clusters/index.ts @@ -0,0 +1,19 @@ +import type { IClusterHandlerConstructor } from './base/IClusterHandler'; +import { ClusterProcessClusterHandler } from './process/ClusterProcessClusterHandler'; +import { ForkProcessClusterHandler } from './process/ForkProcessClusterHandler'; + +export * from './base/BaseClusterHandler'; +export * from './base/IClusterHandler'; +export * from './process/BaseProcessClusterHandler'; +export * from './process/ClusterProcessClusterHandler'; +export * from './process/ForkProcessClusterHandler'; + +export const shardHandlers = new Map>([ + ['cluster', ClusterProcessClusterHandler], + ['fork', ForkProcessClusterHandler], +]); + +export interface ShardHandlers { + cluster: ClusterProcessClusterHandler['manager']['options']; + fork: ForkProcessClusterHandler['manager']['options']; +} diff --git a/packages/sharder/src/server/clusters/process/BaseProcessClusterHandler.ts b/packages/sharder/src/server/clusters/process/BaseProcessClusterHandler.ts new file mode 100644 index 0000000..f853854 --- /dev/null +++ b/packages/sharder/src/server/clusters/process/BaseProcessClusterHandler.ts @@ -0,0 +1,145 @@ +import type { ChildProcess } from 'node:child_process'; +import type { Worker } from 'node:cluster'; +import { once } from 'node:events'; +import { z } from 'zod'; +import type { IMessageHandlerConstructor } from '../../../messages/base/IMessageHandler'; +import type { ShardingManager } from '../../../ShardingManager'; +import { createDeferredPromise } from '../../utils/utils'; +import { BaseClusterHandler } from '../base/BaseClusterHandler'; +import type { ClusterHandlerStartOptions } from '../base/IClusterHandler'; + +const baseProcessClusterHandlerOptionsPredicate = z.strictObject({ + shardArgs: z.string().array().default([]), + execArgv: z.string().array().default([]), +}); + +export abstract class BaseProcessClusterHandler< + Process extends ChildProcess | Worker, + ClusterOptions = BaseProcessClusterHandlerOptions, +> extends BaseClusterHandler { + /** + * Environment variables for the shard's process. + */ + public readonly env: Record; + + public process: Process | null = null; + + private _exitListener: ((options: HandleExitOptions) => void) | null = null; + + public constructor( + ids: readonly number[], + manager: ShardingManager, + messageBuilder: IMessageHandlerConstructor, + ) { + super(ids, manager, messageBuilder); + + this.env = { + ...process.env, + SHARDING_MANAGER: 'true', + SHARDING_MANAGER_CLUSTER_STRATEGY: Reflect.get(this, 'name') as string, + SHARDING_MANAGER_MESSAGE_STRATEGY: this.messages.name, + SHARDS: JSON.stringify(this.ids), + SHARD_COUNT: this.manager.totalShards.toString(), + }; + if (this.manager.token) this.env.DISCORD_TOKEN = this.manager.token; + } + + public abstract get name(): string; + + public async start({ timeout = 30_000 }: ClusterHandlerStartOptions = {}): Promise { + if (this.process !== null) throw new Error('The process was already started.'); + + this._exitListener = this._handleStop.bind(this); + + this.process = this.createProcess(); + + this.process.on('message', this._handleMessage.bind(this)); + this.process.on('exit', this._exitListener); + + if (timeout === -1 || timeout === Infinity) { + this._handleStart(); + return; + } + + const abortController = new AbortController(); + const timer = setTimeout(() => abortController.abort(), timeout).unref(); + + try { + await once(this.process, 'spawn', { signal: abortController.signal }); + await once(this, 'ready', { signal: abortController.signal }); + } finally { + clearTimeout(timer); + } + + this._handleStart(); + } + + public async close(): Promise { + if (this.process === null) throw new Error('The process was already closed.'); + + this.process.off('exit', this._exitListener!); + this.process.kill(); + + await this._handleStop({ respawn: false }); + } + + protected sendMessage(data: string | Buffer): Promise { + if (this.process === null) return Promise.reject(new Error('The process was not initialized.')); + + const deferred = createDeferredPromise(); + this.process.send(data, (error) => { + if (error) deferred.reject(error); + else deferred.resolve(); + }); + + return deferred.promise; + } + + protected abstract createProcess(): Process; + + protected override _handleStart() { + super._handleStart(); + + this.manager.emit('shardSpawn', this); + this.emit('spawn'); + } + + protected override async _handleStop(options: HandleExitOptions = {}) { + super._handleStop(); + this.manager.emit('shardDeath', this); + this.emit('death'); + + this.ready = false; + this.process = null; + + if (options.respawn ?? this._consumeRespawn()) await this.start({ timeout: options.timeout }); + } + + public static override validate(value: unknown): Required { + return baseProcessClusterHandlerOptionsPredicate.parse(value); + } +} + +export interface BaseProcessClusterHandlerOptions { + /** + * Arguments to pass to the shard script when spawning. + */ + shardArgs?: string[]; + + /** + * Arguments to pass to the shard script executable when spawning. + */ + execArgv?: string[]; +} + +interface HandleExitOptions { + /** + * Whether or not to spawn the shard again. + */ + respawn?: boolean; + + /** + * The amount in milliseconds to wait until the client has become ready (`-1` or `Infinity` for no wait). + */ + timeout?: number; +} diff --git a/packages/sharder/src/server/clusters/process/ClusterProcessClusterHandler.ts b/packages/sharder/src/server/clusters/process/ClusterProcessClusterHandler.ts new file mode 100644 index 0000000..4969dde --- /dev/null +++ b/packages/sharder/src/server/clusters/process/ClusterProcessClusterHandler.ts @@ -0,0 +1,20 @@ +import cluster, { Worker } from 'node:cluster'; +import { BaseProcessClusterHandler, BaseProcessClusterHandlerOptions } from './BaseProcessClusterHandler'; + +export class ClusterProcessClusterHandler extends BaseProcessClusterHandler { + public get name(): string { + return 'cluster'; + } + + protected override createProcess(): Worker { + return cluster.fork(this.env); + } + + public static override setup(options: BaseProcessClusterHandlerOptions) { + cluster.setupPrimary({ execArgv: options.execArgv, args: options.shardArgs }); + } + + public static override get isPrimary() { + return cluster.isPrimary; + } +} diff --git a/packages/sharder/src/server/clusters/process/ForkProcessClusterHandler.ts b/packages/sharder/src/server/clusters/process/ForkProcessClusterHandler.ts new file mode 100644 index 0000000..b81aee0 --- /dev/null +++ b/packages/sharder/src/server/clusters/process/ForkProcessClusterHandler.ts @@ -0,0 +1,37 @@ +import { ChildProcess, fork } from 'node:child_process'; +import { z } from 'zod'; +import { pathToFilePredicate } from '../../utils/utils'; +import { BaseProcessClusterHandler, BaseProcessClusterHandlerOptions } from './BaseProcessClusterHandler'; + +const forkProcessClusterHandlerOptionsPredicate = z.strictObject({ + shardArgs: z.string().array().default([]), + execArgv: z.string().array().default([]), + path: pathToFilePredicate, +}); + +export class ForkProcessClusterHandler extends BaseProcessClusterHandler< + ChildProcess, + ForkProcessClusterHandlerOptions +> { + public get name(): string { + return 'fork'; + } + + protected createProcess(): ChildProcess { + return fork(this.manager.options.path, this.manager.options.shardArgs, { + env: this.env, + execArgv: this.manager.options.execArgv, + }); + } + + public static override validate(value: unknown): Required { + return forkProcessClusterHandlerOptionsPredicate.parse(value); + } +} + +export interface ForkProcessClusterHandlerOptions extends BaseProcessClusterHandlerOptions { + /** + * The path of the file to fork. + */ + path: string; +} diff --git a/packages/sharder/src/server/utils/utils.ts b/packages/sharder/src/server/utils/utils.ts new file mode 100644 index 0000000..48df6b9 --- /dev/null +++ b/packages/sharder/src/server/utils/utils.ts @@ -0,0 +1,59 @@ +import type { REST } from '@discordjs/rest'; +import { RESTGetAPIGatewayBotResult, Routes } from 'discord-api-types/v9'; +import { statSync } from 'node:fs'; +import { resolve } from 'node:path'; +import { z } from 'zod'; + +export const pathToFilePredicate = z + .string() + .transform(resolve) + .refine((path) => statSync(path).isFile(), { message: 'Could not resolve path to a file' }); + +export function createDeferredPromise() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + return { promise, resolve, reject }; +} + +export interface DeferredPromise { + resolve(value: T | PromiseLike): void; + reject(reason?: unknown): void; + promise: Promise; +} + +export const fetchRecommendedShardsOptionsPredicate = z.strictObject({ + guildsPerShard: z.number().int().gt(0).default(1000), + multipleOf: z.number().int().gt(0).default(1), +}); + +/** + * Gets the recommended shard count from Discord. + * @param rest The REST handler. + * @param options The options for the recommended shards. + * @returns The amount of shards to create. + */ +export async function fetchRecommendedShards(rest: REST, options: FetchRecommendedShardsOptions = {}) { + const { guildsPerShard, multipleOf } = fetchRecommendedShardsOptionsPredicate.parse(options); + const { shards } = (await rest.get(Routes.gatewayBot())) as RESTGetAPIGatewayBotResult; + return Math.ceil((shards * (1_000 / guildsPerShard)) / multipleOf) * multipleOf; +} + +export interface FetchRecommendedShardsOptions { + /** + * Number of guilds assigned per shard. + * @default 1000 + */ + guildsPerShard?: number; + + /** + * The multiple the shard count should round up to. (16 for large bot sharding). + * @default 1 + */ + multipleOf?: number; +} diff --git a/packages/sharder/src/utils/types.ts b/packages/sharder/src/utils/types.ts new file mode 100644 index 0000000..4adbbb8 --- /dev/null +++ b/packages/sharder/src/utils/types.ts @@ -0,0 +1,2 @@ +// eslint-disable-next-line @typescript-eslint/ban-types +export type NonNullObject = object & {}; diff --git a/packages/sharder/tsconfig.json b/packages/sharder/tsconfig.json new file mode 100644 index 0000000..258fc8e --- /dev/null +++ b/packages/sharder/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "sourceRoot": "./", + "rootDir": "./src", + "outDir": "dist" + }, + "include": ["src/**/*.ts"] +} diff --git a/packages/sharder/tsup.config.ts b/packages/sharder/tsup.config.ts new file mode 100644 index 0000000..0de3ecc --- /dev/null +++ b/packages/sharder/tsup.config.ts @@ -0,0 +1 @@ +export * from '../../tsup.config'; diff --git a/yarn.lock b/yarn.lock index 6fdbe36..798a28a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2192,6 +2192,11 @@ resolved "https://registry.yarnpkg.com/@sapphire/snowflake/-/snowflake-2.1.2.tgz#39c9b387ce2f5983a80400e32b9a5e193ad68490" integrity sha512-BPT2FwukBDVCviNLr45ePnMCdr7ZfjHB5ysWzDzWEY6fPGEkOUN/Hl/XS9jKrJB7SN2U1JFQdZGvpRtSqHo2AQ== +"@sapphire/utilities@^3.1.0": + version "3.1.0" + resolved "https://registry.yarnpkg.com/@sapphire/utilities/-/utilities-3.1.0.tgz#50f18c8ae10535c4fde6b2c68c2a45262c45756f" + integrity sha512-Vwsd567hZ1LPDh791r4DHwxlO1EzUefjYhNj+kwbliHWK2TVPhMvF5v1eZKtUCgdBmobqa8CBPwCwymHstuj5w== + "@sinonjs/commons@^1.7.0": version "1.8.3" resolved "https://registry.yarnpkg.com/@sinonjs/commons/-/commons-1.8.3.tgz#3802ddd21a50a949b6721ddd72da36e67e7f1b2d" @@ -3580,6 +3585,11 @@ discord-api-types@^0.23.1: resolved "https://registry.yarnpkg.com/discord-api-types/-/discord-api-types-0.23.1.tgz#832d0ee2b3c8e2eae02947c1dbf38121d6d357d5" integrity sha512-igWmn+45mzXRWNEPU25I/pr8MwxHb767wAr51oy3VRLRcTlp5ADBbrBR0lq3SA1Rfw3MtM4TQu1xo3kxscfVdQ== +discord-api-types@^0.25.2: + version "0.25.2" + resolved "https://registry.yarnpkg.com/discord-api-types/-/discord-api-types-0.25.2.tgz#e50ed152e6d48fe7963f5de1002ca6f2df57c61b" + integrity sha512-O243LXxb5gLLxubu5zgoppYQuolapGVWPw3ll0acN0+O8TnPUE2kFp9Bt3sTRYodw8xFIknOVxjSeyWYBpVcEQ== + doctrine@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/doctrine/-/doctrine-2.1.0.tgz#5cd01fc101621b42c4cd7f5d1a66243716d3f39d" @@ -8427,3 +8437,8 @@ yocto-queue@^0.1.0: version "0.1.0" resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== + +zod@^3.11.6: + version "3.11.6" + resolved "https://registry.yarnpkg.com/zod/-/zod-3.11.6.tgz#e43a5e0c213ae2e02aefe7cb2b1a6fa3d7f1f483" + integrity sha512-daZ80A81I3/9lIydI44motWe6n59kRBfNzTuS2bfzVh1nAXi667TOTWWtatxyG+fwgNUiagSj/CWZwRRbevJIg==