diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index fbfd2f62160..99cdec783a7 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -1,6 +1,6 @@ use internal::prelude::*; use parking_lot::Mutex; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::mpsc::{self, Sender}; use std::sync::Arc; use std::thread; @@ -135,6 +135,7 @@ impl ShardManager { framework: Arc::clone(&framework), last_start: None, manager_tx: thread_tx.clone(), + queue: VecDeque::new(), runners: Arc::clone(&runners), rx: shard_queue_rx, token: Arc::clone(&token), @@ -184,6 +185,7 @@ impl ShardManager { event_handler: Arc::clone(&event_handler), last_start: None, manager_tx: thread_tx.clone(), + queue: VecDeque::new(), runners: Arc::clone(&runners), rx: shard_queue_rx, token: Arc::clone(&token), diff --git a/src/client/bridge/gateway/shard_queuer.rs b/src/client/bridge/gateway/shard_queuer.rs index bd02532eafd..30a09064e50 100644 --- a/src/client/bridge/gateway/shard_queuer.rs +++ b/src/client/bridge/gateway/shard_queuer.rs @@ -1,8 +1,8 @@ use gateway::Shard; use internal::prelude::*; use parking_lot::Mutex; -use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::collections::{HashMap, VecDeque}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -20,6 +20,8 @@ use typemap::ShareMap; #[cfg(feature = "framework")] use framework::Framework; +const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5; + /// The shard queuer is a simple loop that runs indefinitely to manage the /// startup of shards. /// @@ -49,6 +51,10 @@ pub struct ShardQueuer { /// /// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html pub manager_tx: Sender, + /// The shards that are queued for booting. + /// + /// This will typically be filled with previously failed boots. + pub queue: VecDeque<(u64, u64)>, /// A copy of the map of shard runners. pub runners: Arc>>, /// A receiver channel for the shard queuer to be told to start shards. @@ -91,18 +97,27 @@ impl ShardQueuer { /// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start /// [`rx`]: #structfield.rx pub fn run(&mut self) { - while let Ok(msg) = self.rx.recv() { - match msg { - ShardQueuerMessage::Shutdown => break, - ShardQueuerMessage::Start(shard_id, shard_total) => { - self.check_last_start(); - - if let Err(why) = self.start(shard_id, shard_total) { - warn!("Err starting shard {}: {:?}", shard_id, why); - } - - self.last_start = Some(Instant::now()); + // The duration to timeout from reads over the Rx channel. This can be + // done in a loop, and if the read times out then a shard can be + // started if one is presently waiting in the queue. + let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); + + loop { + match self.rx.recv_timeout(wait_duration) { + Ok(ShardQueuerMessage::Shutdown) => break, + Ok(ShardQueuerMessage::Start(id, total)) => { + self.checked_start(id.0, total.0); + }, + Err(RecvTimeoutError::Disconnected) => { + // If the sender half has disconnected then the queuer's + // lifespan has passed and can shutdown. + break; }, + Err(RecvTimeoutError::Timeout) => { + if let Some((id, total)) = self.queue.pop_front() { + self.checked_start(id, total); + } + } } } } @@ -115,7 +130,7 @@ impl ShardQueuer { // We must wait 5 seconds between IDENTIFYs to avoid session // invalidations. - let duration = Duration::from_secs(5); + let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS); let elapsed = instant.elapsed(); if elapsed >= duration { @@ -127,8 +142,21 @@ impl ShardQueuer { thread::sleep(to_sleep); } - fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> { - let shard_info = [shard_id.0, shard_total.0]; + fn checked_start(&mut self, id: u64, total: u64) { + self.check_last_start(); + + if let Err(why) = self.start(id, total) { + warn!("Err starting shard {}: {:?}", id, why); + info!("Re-queueing start of shard {}", id); + + self.queue.push_back((id, total)); + } + + self.last_start = Some(Instant::now()); + } + + fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> { + let shard_info = [shard_id, shard_total]; let shard = Shard::new( Arc::clone(&self.ws_url), @@ -163,7 +191,7 @@ impl ShardQueuer { let _ = runner.run(); }); - self.runners.lock().insert(shard_id, runner_info); + self.runners.lock().insert(ShardId(shard_id), runner_info); Ok(()) }