From 683691f762bbf58e3abf3bc67381e18112f5c8ad Mon Sep 17 00:00:00 2001 From: Zeyla Hellyer Date: Sat, 30 Sep 2017 16:07:40 -0700 Subject: [PATCH] Improve shard logic Improve shard logic by more cleanly differentiating when resuming, as well as actually fixing resume logic. For shard runners, better handling of dead clients is added, as well as more use of the shard manager, in that the runner will now more liberally request a restart when required (instead of sitting and doing nothing infinitely). --- src/client/bridge/gateway/shard_runner.rs | 158 ++++++++++------------ src/gateway/mod.rs | 53 +++++++- src/gateway/shard.rs | 70 ++++++---- 3 files changed, 168 insertions(+), 113 deletions(-) diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index 530e367dc7c..909e5113e25 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -80,7 +80,7 @@ impl ShardRunner { // If the message is to shutdown, first verify the ID so we know // for certain this runner is to shutdown. if let Ok(ShardManagerMessage::Shutdown(id)) = incoming { - if id.0 == shard.shard_info()[0] { + if id.0 == self.shard_info[0] { let _ = shard.shutdown_clean(); return Ok(()); @@ -90,10 +90,7 @@ impl ShardRunner { if let Err(why) = shard.check_heartbeat() { error!("Failed to heartbeat and reconnect: {:?}", why); - let msg = ShardManagerMessage::Restart(ShardId(shard.shard_info()[0])); - let _ = self.manager_tx.send(msg); - - return Ok(()); + return self.request_restart(); } #[cfg(feature = "voice")] @@ -102,21 +99,21 @@ impl ShardRunner { } } - let events = self.recv_events(); + let (event, successful) = self.recv_events(); + + if let Some(event) = event { + dispatch( + event, + &self.shard, + #[cfg(feature = "framework")] + &self.framework, + &self.data, + &self.event_handler, + ); + } - for event in events { - feature_framework! {{ - dispatch(event, - &self.shard, - &self.framework, - &self.data, - &self.event_handler); - } else { - dispatch(event, - &self.shard, - &self.data, - &self.event_handler); - }} + if !successful && !self.shard.lock().stage().is_connecting() { + return self.request_restart(); } } } @@ -125,78 +122,73 @@ impl ShardRunner { self.runner_tx.clone() } - fn recv_events(&mut self) -> Vec { + /// Returns a received event, as well as whether reading the potentially + /// present event was successful. + fn recv_events(&mut self) -> (Option, bool) { let mut shard = self.shard.lock(); - let mut events = vec![]; + let gw_event = match shard.client.recv_json(GatewayEvent::decode) { + Err(Error::WebSocket(WebSocketError::IoError(_))) => { + // Check that an amount of time at least double the + // heartbeat_interval has passed. + // + // If not, continue on trying to receive messages. + // + // If it has, attempt to auto-reconnect. + let last = shard.last_heartbeat_ack(); + let interval = shard.heartbeat_interval(); + + if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { + let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); + let interval_in_secs = interval / 1000; - loop { - let gw_event = match shard.client.recv_json(GatewayEvent::decode) { - Err(Error::WebSocket(WebSocketError::IoError(_))) => { - // Check that an amount of time at least double the - // heartbeat_interval has passed. - // - // If not, continue on trying to receive messages. - // - // If it has, attempt to auto-reconnect. - let last = shard.last_heartbeat_ack(); - let interval = shard.heartbeat_interval(); - - if let (Some(last_heartbeat_ack), Some(interval)) = (last, interval) { - let seconds_passed = last_heartbeat_ack.elapsed().as_secs(); - let interval_in_secs = interval / 1000; - - if seconds_passed <= interval_in_secs * 2 { - break; - } - } else { - break; + if seconds_passed <= interval_in_secs * 2 { + return (None, true); } + } else { + return (None, true); + } - debug!("[ShardRunner {:?}] Attempting to auto-reconnect", - self.shard_info); + debug!("Attempting to auto-reconnect"); - if let Err(why) = shard.autoreconnect() { - error!( - "[ShardRunner {:?}] Failed to auto-reconnect: {:?}", - self.shard_info, - why, - ); - } + if let Err(why) = shard.autoreconnect() { + error!("Failed to auto-reconnect: {:?}", why); + } - break; - }, - Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { - // This is hit when the websocket client dies this will be - // hit every iteration. - break; - }, - other => other, - }; - - let event = match gw_event { - Ok(Some(event)) => Ok(event), - Ok(None) => break, - Err(why) => Err(why), - }; - - let event = match shard.handle_event(event) { - Ok(Some(event)) => event, - Ok(None) => continue, - Err(why) => { - error!("Shard handler received err: {:?}", why); - - continue; - }, - }; - - events.push(event); - - if events.len() > 5 { - break; - } + return (None, true); + }, + Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => { + // This is hit when the websocket client dies this will be + // hit every iteration. + return (None, false); + }, + other => other, }; - events + let event = match gw_event { + Ok(Some(event)) => Ok(event), + Ok(None) => return (None, true), + Err(why) => Err(why), + }; + + let event = match shard.handle_event(event) { + Ok(Some(event)) => event, + Ok(None) => return (None, true), + Err(why) => { + error!("Shard handler received err: {:?}", why); + + return (None, true); + }, + }; + + (Some(event), true) + } + + fn request_restart(&self) -> Result<()> { + debug!("[ShardRunner {:?}] Requesting restart", self.shard_info); + let msg = ShardManagerMessage::Restart(ShardId(self.shard_info[0])); + let _ = self.manager_tx.send(msg); + + Ok(()) } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 6f839dbf8fb..bd9c45b03ed 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -60,7 +60,7 @@ pub use self::shard::Shard; /// This can be useful for knowing which shards are currently "down"/"up". /// /// [`Shard`]: struct.Shard.html -#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] pub enum ConnectionStage { /// Indicator that the [`Shard`] is normally connected and is not in, e.g., /// a resume phase. @@ -83,5 +83,56 @@ pub enum ConnectionStage { Handshake, /// Indicator that the [`Shard`] has sent an IDENTIFY packet and is awaiting /// a READY packet. + /// + /// [`Shard`]: struct.Shard.html Identifying, + /// Indicator that the [`Shard`] has sent a RESUME packet and is awaiting a + /// RESUMED packet. + /// + /// [`Shard`]: struct.Shard.html + Resuming, +} + +impl ConnectionStage { + /// Whether the stage is a form of connecting. + /// + /// This will return `true` on: + /// + /// - [`Connecting`][`ConnectionStage::Connecting`] + /// - [`Handshake`][`ConnectionStage::Handshake`] + /// - [`Identifying`][`ConnectionStage::Identifying`] + /// - [`Resuming`][`ConnectionStage::Resuming`] + /// + /// All other variants will return `false`. + /// + /// # Examples + /// + /// Assert that [`ConnectionStage::Identifying`] is a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(ConnectionStage::Identifying.is_connecting()); + /// ``` + /// + /// Assert that [`ConnectionStage::Connected`] is _not_ a connecting stage: + /// + /// ```rust + /// use serenity::gateway::ConnectionStage; + /// + /// assert!(!ConnectionStage::Connected.is_connecting()); + /// ``` + /// + /// [`ConnectionStage::Connecting`]: #variant.Connecting + /// [`ConnectionStage::Handshake`]: #variant.Handshake + /// [`ConnectionStage::Identifying`]: #variant.Identifying + /// [`ConnectionStage::Resuming`]: #variant.Resuming + pub fn is_connecting(&self) -> bool { + use self::ConnectionStage::*; + + *self == Connecting + || *self == Handshake + || *self == Identifying + || *self == Resuming + } } diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index e6172552164..de5104e5950 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -136,7 +136,7 @@ impl Shard { let stage = ConnectionStage::Handshake; let session_id = None; - let mut shard = feature_voice! { + Ok(feature_voice! { { let (tx, rx) = mpsc::channel(); @@ -172,11 +172,7 @@ impl Shard { ws_url, } } - }; - - shard.identify()?; - - Ok(shard) + }) } /// Retrieves a copy of the current shard information. @@ -308,6 +304,11 @@ impl Shard { self.update_presence(); } + /// Returns the current connection stage of the shard. + pub fn stage(&self) -> ConnectionStage { + self.stage.clone() + } + /// Handles an event from the gateway over the receiver, requiring the /// receiver to be passed if a reconnect needs to occur. /// @@ -414,14 +415,16 @@ impl Shard { self.shard_info, interval); + if self.stage == ConnectionStage::Resuming { + return Ok(None); + } + if interval > 0 { self.heartbeat_interval = Some(interval); } if self.stage == ConnectionStage::Handshake { - self.stage = ConnectionStage::Identifying; - - Ok(None) + self.identify().and(Ok(None)) } else { debug!("[Shard {:?}] Received late Hello; autoreconnecting", self.shard_info); @@ -438,9 +441,7 @@ impl Shard { self.seq = 0; self.session_id = None; - self.identify()?; - - Ok(None) + self.identify().and(Ok(None)) }, Ok(GatewayEvent::Reconnect) => self.reconnect().and(Ok(None)), Err(Error::Gateway(GatewayError::Closed(data))) => { @@ -448,18 +449,6 @@ impl Shard { let reason = data.map(|d| d.reason); let clean = num == Some(1000); - { - let kind = if clean { "Cleanly" } else { "Uncleanly" }; - - info!( - "[Shard {:?}] {} closing with {:?}: {:?}", - self.shard_info, - kind, - num, - reason - ); - } - match num { Some(close_codes::UNKNOWN_OPCODE) => { warn!("[Shard {:?}] Sent invalid opcode", @@ -524,9 +513,9 @@ impl Shard { } let resume = num.map(|x| { - x != 1000 && x != close_codes::AUTHENTICATION_FAILED && + x != close_codes::AUTHENTICATION_FAILED && self.session_id.is_some() - }).unwrap_or(false); + }).unwrap_or(true); if resume { self.resume().or_else(|_| self.reconnect()).and(Ok(None)) @@ -878,11 +867,15 @@ impl Shard { /// Retrieves the `heartbeat_interval`. #[inline] - pub(crate) fn heartbeat_interval(&self) -> Option { self.heartbeat_interval } + pub(crate) fn heartbeat_interval(&self) -> Option { + self.heartbeat_interval + } /// Retrieves the value of when the last heartbeat ack was received. #[inline] - pub(crate) fn last_heartbeat_ack(&self) -> Option { self.heartbeat_instants.1 } + pub(crate) fn last_heartbeat_ack(&self) -> Option { + self.heartbeat_instants.1 + } fn reconnect(&mut self) -> Result<()> { info!("[Shard {:?}] Attempting to reconnect", self.shard_info); @@ -902,6 +895,9 @@ impl Shard { fn resume(&mut self) -> Result<()> { debug!("Shard {:?}] Attempting to resume", self.shard_info); + self.initialize()?; + self.stage = ConnectionStage::Resuming; + self.send_resume().or_else(|why| { warn!("[Shard {:?}] Err sending resume: {:?}", self.shard_info, @@ -931,11 +927,26 @@ impl Shard { })) } + /// Initializes a new WebSocket client. + /// + /// This will set the stage of the shard before and after instantiation of + /// the client. fn initialize(&mut self) -> Result<()> { + debug!("[Shard {:?}] Initializing", self.shard_info); + + // We need to do two, sort of three things here: + // + // - set the stage of the shard as opening the websocket connection + // - open the websocket connection + // - if successful, set the current stage as Handshaking + // + // This is used to accurately assess whether the state of the shard is + // accurate when a Hello is received. self.stage = ConnectionStage::Connecting; self.client = connect(&self.ws_url.lock().unwrap())?; + self.stage = ConnectionStage::Handshake; - self.identify() + Ok(()) } fn identify(&mut self) -> Result<()> { @@ -956,6 +967,7 @@ impl Shard { }); self.heartbeat_instants.0 = Some(Instant::now()); + self.stage = ConnectionStage::Identifying; debug!("[Shard {:?}] Identifying", self.shard_info);