Skip to content

Commit

Permalink
fix(ShardManager): refactor concurrency check, add option (#1382)
Browse files Browse the repository at this point in the history
  • Loading branch information
abalabahaha authored Jun 5, 2022
1 parent 162d4ef commit 05a932a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
8 changes: 6 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ declare namespace Eris {
rest?: RequestHandlerOptions;
restMode?: boolean;
seedVoiceConnections?: boolean;
shardConcurrency?: number | "auto";
ws?: unknown;
}
interface CommandClientOptions {
Expand Down Expand Up @@ -824,6 +825,9 @@ declare namespace Eris {
res: (value: Member[]) => void;
timeout: NodeJS.Timeout;
}
interface ShardManagerOptions {
concurrency?: number | "auto";
}

// Guild
interface CreateGuildOptions {
Expand Down Expand Up @@ -3378,10 +3382,10 @@ declare namespace Eris {
}

export class ShardManager extends Collection<Shard> implements SimpleJSON {
buckets: Map<number, number>;
connectQueue: Shard[];
connectTimeout: NodeJS.Timer | null;
lastConnect: number;
constructor(client: Client);
constructor(client: Client, options: ShardManagerOptions);
connect(shard: Shard): void;
spawn(id: number): void;
tryConnect(): void;
Expand Down
19 changes: 15 additions & 4 deletions lib/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Client extends EventEmitter {
* @arg {Number} [options.rest.requestTimeout=15000] A number of milliseconds before REST requests are considered timed out
* @arg {Boolean} [options.restMode=false] Whether to enable getting objects over REST. Even with this option enabled, it is recommended that you check the cache first before using REST
* @arg {Boolean} [options.seedVoiceConnections=false] Whether to populate bot.voiceConnections with existing connections the bot account has during startup. Note that this will disconnect connections from other bot sessions
* @arg {Number | String} [options.shardConcurrency="auto"] The number of shards that can start simultaneously. If "auto" Eris will use Discord's recommended shard concurrency.
* @arg {Object} [options.ws] An object of WebSocket options to pass to the shard WebSocket constructors
*/
constructor(token, options) {
Expand Down Expand Up @@ -149,6 +150,7 @@ class Client extends EventEmitter {
rest: {},
restMode: false,
seedVoiceConnections: false,
shardConcurrency: "auto",
ws: {},
reconnectDelay: (lastDelay, attempts) => Math.pow(attempts + 1, 0.7) * 20000
}, options);
Expand Down Expand Up @@ -202,13 +204,18 @@ class Client extends EventEmitter {
this.requestHandler = new RequestHandler(this, this.options.rest);
delete this.options.rest;

const shardManagerOptions = {};
if(typeof this.options.shardConcurrency === "number") {
shardManagerOptions.concurrency = this.options.shardConcurrency;
}
this.shards = new ShardManager(this, shardManagerOptions);

this.ready = false;
this.bot = this._token.startsWith("Bot ");
this.startTime = 0;
this.lastConnect = 0;
this.channelGuildMap = {};
this.threadGuildMap = {};
this.shards = new ShardManager(this);
this.groupChannels = new Collection(GroupChannel);
this.guilds = new Collection(Guild);
this.privateChannelMap = {};
Expand Down Expand Up @@ -413,12 +420,12 @@ class Client extends EventEmitter {
}

/**
* Tells all shards to connect.
* Tells all shards to connect. This will call `getBotGateway()`, which is ratelimited.
* @returns {Promise} Resolves when all shards are initialized
*/
async connect() {
try {
const data = await (this.options.maxShards === "auto" ? this.getBotGateway() : this.getGateway());
const data = await (this.options.maxShards === "auto" || (this.options.shardConcurrency === "auto" && this.bot) ? this.getBotGateway() : this.getGateway());
if(!data.url || (this.options.maxShards === "auto" && !data.shards)) {
throw new Error("Invalid response from gateway REST call");
}
Expand All @@ -444,8 +451,12 @@ class Client extends EventEmitter {
}
}

if(this.options.shardConcurrency === "auto" && typeof data.session_start_limit?.max_concurrency === "number") {
this.shards.setConcurrency(data.session_start_limit.max_concurrency);
}

for(let i = this.options.firstShardID; i <= this.options.lastShardID; ++i) {
await this.shards.spawn(i);
this.shards.spawn(i);
}
} catch(err) {
if(!this.options.autoreconnect) {
Expand Down
39 changes: 18 additions & 21 deletions lib/gateway/ShardManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,29 @@ const Collection = require("../util/Collection");
const Shard = require("./Shard");

class ShardManager extends Collection {
constructor(client) {
constructor(client, options = {}) {
super(Shard);
this._client = client;

this.connectQueue = [];
this.connectTimeout = null;
this.options = Object.assign({
concurrency: 1
}, options);

this.concurrency = null;
this.buckets = new Map();
this.connectQueue = [];
this.connectTimeout = null;
}

async connect(shard) {
// fetch max_concurrency if needed
if(!this.concurrency) {
const gateway = await this._client.getBotGateway();
if(gateway.session_start_limit && gateway.session_start_limit.max_concurrency) {
this.concurrency = gateway.session_start_limit.max_concurrency;
} else {
this.concurrency = 1;
}
}

connect(shard) {
this.connectQueue.push(shard);
this.tryConnect();
}

async spawn(id) {
setConcurrency(concurrency) {
this.options.concurrency = concurrency;
}

spawn(id) {
let shard = this.get(id);
if(!shard) {
shard = this.add(new Shard(id, this._client));
Expand Down Expand Up @@ -111,7 +107,7 @@ class ShardManager extends Collection {
// loop over the connectQueue
for(const shard of this.connectQueue) {
// find the bucket for our shard
const rateLimitKey = (shard.id % this.concurrency) || 0;
const rateLimitKey = (shard.id % this.options.concurrency) || 0;
const lastConnect = this.buckets.get(rateLimitKey) || 0;

// has enough time passed since the last connect for this bucket (5s/bucket)?
Expand All @@ -121,7 +117,7 @@ class ShardManager extends Collection {
}

// Are there any connecting shards in the same bucket we should wait on?
if(this.some((s) => s.connecting && ((s.id % this.concurrency) || 0) === rateLimitKey)) {
if(this.some((s) => s.connecting && ((s.id % this.options.concurrency) || 0) === rateLimitKey)) {
continue;
}

Expand All @@ -144,7 +140,7 @@ class ShardManager extends Collection {
}

_readyPacketCB(shardID) {
const rateLimitKey = (shardID % this.concurrency) || 0;
const rateLimitKey = (shardID % this.options.concurrency) || 0;
this.buckets.set(rateLimitKey, Date.now());

this.tryConnect();
Expand All @@ -156,9 +152,10 @@ class ShardManager extends Collection {

toJSON(props = []) {
return Base.prototype.toJSON.call(this, [
"buckets",
"connectQueue",
"lastConnect",
"connectionTimeout",
"connectTimeout",
"options",
...props
]);
}
Expand Down

0 comments on commit 05a932a

Please sign in to comment.