Skip to content

Commit

Permalink
Attempt to restart failed shard boots
Browse files Browse the repository at this point in the history
When the ShardQueuer fails to restart a shard (such as due to a network
error, an issue on Discord's side, Cloudflare, etc.), it will now push
the ID onto a queue.

Every 5 seconds messages will attempt to be read from the receiver, and
if one is not read after the timeout, a queued shard start will occur
(if one is queued).

This should fix a number of reconnection issues.
  • Loading branch information
Zeyla Hellyer committed Dec 16, 2017
1 parent bcd16dd commit 8d68503
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
4 changes: 3 additions & 1 deletion src/client/bridge/gateway/shard_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::thread;
Expand Down Expand Up @@ -135,6 +135,7 @@ impl ShardManager {
framework: Arc::clone(&framework),
last_start: None,
manager_tx: thread_tx.clone(),
queue: VecDeque::new(),
runners: Arc::clone(&runners),
rx: shard_queue_rx,
token: Arc::clone(&token),
Expand Down Expand Up @@ -184,6 +185,7 @@ impl ShardManager {
event_handler: Arc::clone(&event_handler),
last_start: None,
manager_tx: thread_tx.clone(),
queue: VecDeque::new(),
runners: Arc::clone(&runners),
rx: shard_queue_rx,
token: Arc::clone(&token),
Expand Down
62 changes: 45 additions & 17 deletions src/client/bridge/gateway/shard_queuer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
Expand All @@ -20,6 +20,8 @@ use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;

const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5;

/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
Expand Down Expand Up @@ -49,6 +51,10 @@ pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
///
/// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
pub manager_tx: Sender<ShardManagerMessage>,
/// The shards that are queued for booting.
///
/// This will typically be filled with previously failed boots.
pub queue: VecDeque<(u64, u64)>,
/// A copy of the map of shard runners.
pub runners: Arc<Mutex<HashMap<ShardId, ShardRunnerInfo>>>,
/// A receiver channel for the shard queuer to be told to start shards.
Expand Down Expand Up @@ -91,18 +97,27 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
/// [`ShardQueuerMessage::Start`]: enum.ShardQueuerMessage.html#variant.Start
/// [`rx`]: #structfield.rx
pub fn run(&mut self) {
while let Ok(msg) = self.rx.recv() {
match msg {
ShardQueuerMessage::Shutdown => break,
ShardQueuerMessage::Start(shard_id, shard_total) => {
self.check_last_start();

if let Err(why) = self.start(shard_id, shard_total) {
warn!("Err starting shard {}: {:?}", shard_id, why);
}

self.last_start = Some(Instant::now());
// The duration to timeout from reads over the Rx channel. This can be
// done in a loop, and if the read times out then a shard can be
// started if one is presently waiting in the queue.
let wait_duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);

loop {
match self.rx.recv_timeout(wait_duration) {
Ok(ShardQueuerMessage::Shutdown) => break,
Ok(ShardQueuerMessage::Start(id, total)) => {
self.checked_start(id.0, total.0);
},
Err(RecvTimeoutError::Disconnected) => {
// If the sender half has disconnected then the queuer's
// lifespan has passed and can shutdown.
break;
},
Err(RecvTimeoutError::Timeout) => {
if let Some((id, total)) = self.queue.pop_front() {
self.checked_start(id, total);
}
}
}
}
}
Expand All @@ -115,7 +130,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {

// We must wait 5 seconds between IDENTIFYs to avoid session
// invalidations.
let duration = Duration::from_secs(5);
let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
let elapsed = instant.elapsed();

if elapsed >= duration {
Expand All @@ -127,8 +142,21 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
thread::sleep(to_sleep);
}

fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
fn checked_start(&mut self, id: u64, total: u64) {
self.check_last_start();

if let Err(why) = self.start(id, total) {
warn!("Err starting shard {}: {:?}", id, why);
info!("Re-queueing start of shard {}", id);

self.queue.push_back((id, total));
}

self.last_start = Some(Instant::now());
}

fn start(&mut self, shard_id: u64, shard_total: u64) -> Result<()> {
let shard_info = [shard_id, shard_total];

let shard = Shard::new(
Arc::clone(&self.ws_url),
Expand Down Expand Up @@ -163,7 +191,7 @@ impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
let _ = runner.run();
});

self.runners.lock().insert(shard_id, runner_info);
self.runners.lock().insert(ShardId(shard_id), runner_info);

Ok(())
}
Expand Down

0 comments on commit 8d68503

Please sign in to comment.