From 1b30028caf6831c72089d87e5d76497635ed1b8a Mon Sep 17 00:00:00 2001 From: HcgRandon Date: Fri, 6 May 2022 15:52:48 -0400 Subject: [PATCH 1/4] Improved max_concurrency aware ShardManager --- examples/sharding.js | 26 +++++++++++++ lib/Client.js | 2 +- lib/gateway/ShardManager.js | 77 +++++++++++++++++++++++++++---------- 3 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 examples/sharding.js diff --git a/examples/sharding.js b/examples/sharding.js new file mode 100644 index 000000000..2444bc079 --- /dev/null +++ b/examples/sharding.js @@ -0,0 +1,26 @@ +const Eris = require("eris"); + +// Replace TOKEN with your bot account's token +const bot = new Eris("Bot TOKEN", { + firstShardID: 0, + lastShardID: 15, + maxShards: 16, + getAllUsers: false, + intents: ["guilds", "guildMembers", "guildPresences"] +}); + +bot.on("ready", () => { // When the bot is ready + console.log("Ready!"); // Log "Ready!" + console.timeEnd("ready"); +}); + +bot.on("error", (err) => { + console.error(err); // or your preferred logger +}); + +bot.on("shardReady", (id) => { + console.log(`Shard ${id} ready!`); +}); + +console.time("ready"); +bot.connect(); // Get the bot to connect to Discord diff --git a/lib/Client.js b/lib/Client.js index 2dc7264c8..4a5fb0541 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -445,7 +445,7 @@ class Client extends EventEmitter { } for(let i = this.options.firstShardID; i <= this.options.lastShardID; ++i) { - this.shards.spawn(i); + await this.shards.spawn(i); } } catch(err) { if(!this.options.autoreconnect) { diff --git a/lib/gateway/ShardManager.js b/lib/gateway/ShardManager.js index e5daccfb1..d5fd4e67f 100644 --- a/lib/gateway/ShardManager.js +++ b/lib/gateway/ShardManager.js @@ -2,29 +2,39 @@ const Base = require("../structures/Base"); const Collection = require("../util/Collection"); +const Endpoints = require("../rest/Endpoints"); const Shard = require("./Shard"); +const SHARD_IDENTIFY_RATELIMIT = 5000; // 5s + class ShardManager extends Collection { constructor(client) { super(Shard); this._client = client; this.connectQueue = []; - this.lastConnect = 0; this.connectTimeout = null; + + this.concurrency = null; + this.buckets = new Map(); } - connect(shard) { - if(shard.sessionID || (this.lastConnect <= Date.now() - 5000 && !this.find((shard) => shard.connecting))) { - shard.connect(); - this.lastConnect = Date.now() + 7500; - } else { - this.connectQueue.push(shard); - this.tryConnect(); + async connect(shard) { + // fetch max_concurrency if needed + if(!this.concurrency) { + const gateway = await this._client.requestHandler.request("GET", Endpoints.GATEWAY_BOT, true); + if(gateway.session_start_limit && gateway.session_start_limit.max_concurrency) { + this.concurrency = gateway.session_start_limit.max_concurrency; + } else { + this.concurrency = 1; + } } + + this.connectQueue.push(shard); + this.tryConnect(); } - spawn(id) { + async spawn(id) { let shard = this.get(id); if(!shard) { shard = this.add(new Shard(id, this._client)); @@ -91,27 +101,52 @@ class ShardManager extends Collection { }); } if(shard.status === "disconnected") { - this.connect(shard); + return this.connect(shard); } } tryConnect() { - if(this.connectQueue.length > 0) { - if(this.lastConnect <= Date.now() - 5000) { - const shard = this.connectQueue.shift(); - shard.connect(); - this.lastConnect = Date.now() + 7500; - } else if(!this.connectTimeout) { - this.connectTimeout = setTimeout(() => { - this.connectTimeout = null; - this.tryConnect(); - }, 1000); + // nothing in queue + if(this.connectQueue.length === 0) { + return; + } + + // loop over the connectQueue + for(const shard of this.connectQueue) { + // find the bucket for our shard + const rateLimitKey = (shard.id % this.concurrency) || 0; + const lastConnect = this.buckets.get(rateLimitKey) || 0; + + // has enough time passed since the last connect for this bucket? + // alternatively if we have a sessionID, we can skip this check + if(!shard.sessionID && Date.now() - lastConnect < SHARD_IDENTIFY_RATELIMIT) { + continue; + } + + // Are there any connecting shards in the same bucket we should wait on? + if(this.some((s) => s.connecting && ((s.id % this.concurrency) || 0) === rateLimitKey)) { + continue; } + + // connect the shard + shard.connect(); + this.buckets.set(rateLimitKey, Date.now()); + + // remove the shard from the queue + const index = this.connectQueue.findIndex((s) => s.id === shard.id); + this.connectQueue.splice(index, 1); + } + + // set the next timeout if we have more shards to connect + if(!this.connectTimeout && this.connectQueue.length > 0) { + this.connectTimeout = setTimeout(() => { + this.connectTimeout = null; + this.tryConnect(); + }, 1000); } } _readyPacketCB() { - this.lastConnect = Date.now(); this.tryConnect(); } From 0dafb4e94410e37eeae1c62c4d78a9c6d3cfb5ef Mon Sep 17 00:00:00 2001 From: HcgRandon Date: Fri, 6 May 2022 16:06:39 -0400 Subject: [PATCH 2/4] Update bucket time in readyPacketCB --- lib/gateway/Shard.js | 2 +- lib/gateway/ShardManager.js | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/gateway/Shard.js b/lib/gateway/Shard.js index 9e275724f..5d1dbcdbc 100644 --- a/lib/gateway/Shard.js +++ b/lib/gateway/Shard.js @@ -1854,7 +1854,7 @@ class Shard extends EventEmitter { this.connectTimeout = null; this.status = "ready"; this.presence.status = "online"; - this.client.shards._readyPacketCB(); + this.client.shards._readyPacketCB(this.id); if(packet.t === "RESUMED") { // Can only heartbeat after resume succeeds, discord/discord-api-docs#1619 diff --git a/lib/gateway/ShardManager.js b/lib/gateway/ShardManager.js index d5fd4e67f..8f87bf9f9 100644 --- a/lib/gateway/ShardManager.js +++ b/lib/gateway/ShardManager.js @@ -146,7 +146,10 @@ class ShardManager extends Collection { } } - _readyPacketCB() { + _readyPacketCB(shardID) { + const rateLimitKey = (shardID % this.concurrency) || 0; + this.buckets.set(rateLimitKey, Date.now()); + this.tryConnect(); } From c204a5f7d11b82c9355ba09fff5b449801f89c5d Mon Sep 17 00:00:00 2001 From: HcgRandon Date: Fri, 6 May 2022 16:10:10 -0400 Subject: [PATCH 3/4] hardcode constant as requested --- lib/gateway/ShardManager.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/gateway/ShardManager.js b/lib/gateway/ShardManager.js index 8f87bf9f9..fc104fbcc 100644 --- a/lib/gateway/ShardManager.js +++ b/lib/gateway/ShardManager.js @@ -5,8 +5,6 @@ const Collection = require("../util/Collection"); const Endpoints = require("../rest/Endpoints"); const Shard = require("./Shard"); -const SHARD_IDENTIFY_RATELIMIT = 5000; // 5s - class ShardManager extends Collection { constructor(client) { super(Shard); @@ -117,9 +115,9 @@ class ShardManager extends Collection { const rateLimitKey = (shard.id % this.concurrency) || 0; const lastConnect = this.buckets.get(rateLimitKey) || 0; - // has enough time passed since the last connect for this bucket? + // has enough time passed since the last connect for this bucket (5s/bucket)? // alternatively if we have a sessionID, we can skip this check - if(!shard.sessionID && Date.now() - lastConnect < SHARD_IDENTIFY_RATELIMIT) { + if(!shard.sessionID && Date.now() - lastConnect < 5000) { continue; } From f83ed5059ccad1f10c4a35fce0fa73fdbc85b88d Mon Sep 17 00:00:00 2001 From: HcgRandon Date: Thu, 2 Jun 2022 15:43:43 -0400 Subject: [PATCH 4/4] use client.getBotGateway, 500ms between tryConnect --- lib/gateway/ShardManager.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/gateway/ShardManager.js b/lib/gateway/ShardManager.js index fc104fbcc..f2cc8c6da 100644 --- a/lib/gateway/ShardManager.js +++ b/lib/gateway/ShardManager.js @@ -2,7 +2,6 @@ const Base = require("../structures/Base"); const Collection = require("../util/Collection"); -const Endpoints = require("../rest/Endpoints"); const Shard = require("./Shard"); class ShardManager extends Collection { @@ -20,7 +19,7 @@ class ShardManager extends Collection { async connect(shard) { // fetch max_concurrency if needed if(!this.concurrency) { - const gateway = await this._client.requestHandler.request("GET", Endpoints.GATEWAY_BOT, true); + const gateway = await this._client.getBotGateway(); if(gateway.session_start_limit && gateway.session_start_limit.max_concurrency) { this.concurrency = gateway.session_start_limit.max_concurrency; } else { @@ -140,7 +139,7 @@ class ShardManager extends Collection { this.connectTimeout = setTimeout(() => { this.connectTimeout = null; this.tryConnect(); - }, 1000); + }, 500); } }