Skip to content

Commit

Permalink
Improve shard logic
Browse files Browse the repository at this point in the history
Improve shard logic by more cleanly differentiating when resuming, as
well as actually fixing resume logic.

For shard runners, better handling of dead clients is added, as well as
more use of the shard manager, in that the runner will now more
liberally request a restart when required (instead of sitting and doing
nothing infinitely).
  • Loading branch information
Zeyla Hellyer committed Sep 30, 2017
1 parent 21e194b commit 683691f
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 113 deletions.
158 changes: 75 additions & 83 deletions src/client/bridge/gateway/shard_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
// If the message is to shutdown, first verify the ID so we know
// for certain this runner is to shutdown.
if let Ok(ShardManagerMessage::Shutdown(id)) = incoming {
if id.0 == shard.shard_info()[0] {
if id.0 == self.shard_info[0] {
let _ = shard.shutdown_clean();

return Ok(());
Expand All @@ -90,10 +90,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
if let Err(why) = shard.check_heartbeat() {
error!("Failed to heartbeat and reconnect: {:?}", why);

let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0]));
let _ = self.manager_tx.send(msg);

return Ok(());
return self.request_restart();
}

#[cfg(feature = "voice")]
Expand All @@ -102,21 +99,21 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
}
}

let events = self.recv_events();
let (event, successful) = self.recv_events();

if let Some(event) = event {
dispatch(
event,
&self.shard,
#[cfg(feature = "framework")]
&self.framework,
&self.data,
&self.event_handler,
);
}

for event in events {
feature_framework! {{
dispatch(event,
&self.shard,
&self.framework,
&self.data,
&self.event_handler);
} else {
dispatch(event,
&self.shard,
&self.data,
&self.event_handler);
}}
if !successful && !self.shard.lock().stage().is_connecting() {
return self.request_restart();
}
}
}
Expand All @@ -125,78 +122,73 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
self.runner_tx.clone()
}

fn recv_events(&mut self) -> Vec<Event> {
/// Returns a received event, as well as whether reading the potentially
/// present event was successful.
fn recv_events(&mut self) -> (Option<Event>, bool) {
let mut shard = self.shard.lock();

let mut events = vec![];
let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
//
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
let last = shard.last_heartbeat_ack();
let interval = shard.heartbeat_interval();

if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
let interval_in_secs = interval / 1000;

loop {
let gw_event = match shard.client.recv_json(GatewayEvent::decode) {
Err(Error::WebSocket(WebSocketError::IoError(_))) => {
// Check that an amount of time at least double the
// heartbeat_interval has passed.
//
// If not, continue on trying to receive messages.
//
// If it has, attempt to auto-reconnect.
let last = shard.last_heartbeat_ack();
let interval = shard.heartbeat_interval();

if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) {
let seconds_passed = last_heartbeat_ack.elapsed().as_secs();
let interval_in_secs = interval / 1000;

if seconds_passed <= interval_in_secs * 2 {
break;
}
} else {
break;
if seconds_passed <= interval_in_secs * 2 {
return (None, true);
}
} else {
return (None, true);
}

debug!("[ShardRunner {:?}] Attempting to auto-reconnect",
self.shard_info);
debug!("Attempting to auto-reconnect");

if let Err(why) = shard.autoreconnect() {
error!(
"[ShardRunner {:?}] Failed to auto-reconnect: {:?}",
self.shard_info,
why,
);
}
if let Err(why) = shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
}

break;
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
break;
},
other => other,
};

let event = match gw_event {
Ok(Some(event)) => Ok(event),
Ok(None) => break,
Err(why) => Err(why),
};

let event = match shard.handle_event(event) {
Ok(Some(event)) => event,
Ok(None) => continue,
Err(why) => {
error!("Shard handler received err: {:?}", why);

continue;
},
};

events.push(event);

if events.len() > 5 {
break;
}
return (None, true);
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
return (None, false);
},
other => other,
};

events
let event = match gw_event {
Ok(Some(event)) => Ok(event),
Ok(None) => return (None, true),
Err(why) => Err(why),
};

let event = match shard.handle_event(event) {
Ok(Some(event)) => event,
Ok(None) => return (None, true),
Err(why) => {
error!("Shard handler received err: {:?}", why);

return (None, true);
},
};

(Some(event), true)
}

fn request_restart(&self) -> Result<()> {
debug!("[ShardRunner {:?}] Requesting restart", self.shard_info);
let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0]));
let _ = self.manager_tx.send(msg);

Ok(())
}
}
53 changes: 52 additions & 1 deletion src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub use self::shard::Shard;
/// This can be useful for knowing which shards are currently "down"/"up".
///
/// [`Shard`]: struct.Shard.html
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ConnectionStage {
/// Indicator that the [`Shard`] is normally connected and is not in, e.g.,
/// a resume phase.
Expand All @@ -83,5 +83,56 @@ pub enum ConnectionStage {
Handshake,
/// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting
/// a READY packet.
///
/// [`Shard`]: struct.Shard.html
Identifying,
/// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a
/// RESUMED packet.
///
/// [`Shard`]: struct.Shard.html
Resuming,
}

impl ConnectionStage {
/// Whether the stage is a form of connecting.
///
/// This will return `true` on:
///
/// - [`Connecting`][`ConnectionStage::Connecting`]
/// - [`Handshake`][`ConnectionStage::Handshake`]
/// - [`Identifying`][`ConnectionStage::Identifying`]
/// - [`Resuming`][`ConnectionStage::Resuming`]
///
/// All other variants will return `false`.
///
/// # Examples
///
/// Assert that [`ConnectionStage::Identifying`] is a connecting stage:
///
/// ```rust
/// use serenity::gateway::ConnectionStage;
///
/// assert!(ConnectionStage::Identifying.is_connecting());
/// ```
///
/// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage:
///
/// ```rust
/// use serenity::gateway::ConnectionStage;
///
/// assert!(!ConnectionStage::Connected.is_connecting());
/// ```
///
/// [`ConnectionStage::Connecting`]: #variant.Connecting
/// [`ConnectionStage::Handshake`]: #variant.Handshake
/// [`ConnectionStage::Identifying`]: #variant.Identifying
/// [`ConnectionStage::Resuming`]: #variant.Resuming
pub fn is_connecting(&self) -> bool {
use self::ConnectionStage::*;

*self == Connecting
|| *self == Handshake
|| *self == Identifying
|| *self == Resuming
}
}
Loading

0 comments on commit 683691f

Please sign in to comment.