Skip to content

Commit

Permalink
Improve shard and shard runner logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeyla Hellyer committed Sep 30, 2017
1 parent 17838b5 commit 21e194b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 60 deletions.
22 changes: 19 additions & 3 deletions src/client/bridge/gateway/shard_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ShardRunner<H: EventHandler + 'static> {
runner_rx: Receiver<ShardManagerMessage>,
runner_tx: Sender<ShardManagerMessage>,
shard: LockedShard,
shard_info: [u64; 2],
}

impl<H: EventHandler + 'static> ShardRunner<H> {
Expand All @@ -33,6 +34,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
data: Arc<ParkingLotMutex<ShareMap>>,
event_handler: Arc<H>) -> Self {
let (tx, rx) = mpsc::channel();
let shard_info = shard.lock().shard_info();

Self {
runner_rx: rx,
Expand All @@ -42,6 +44,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
framework,
manager_tx,
shard,
shard_info,
}
}

Expand All @@ -51,6 +54,7 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
data: Arc<ParkingLotMutex<ShareMap>>,
event_handler: Arc<H>) -> Self {
let (tx, rx) = mpsc::channel();
let shard_info = shard.lock().shard_info();

Self {
runner_rx: rx,
Expand All @@ -59,10 +63,13 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
event_handler,
manager_tx,
shard,
shard_info,
}
}

pub fn run(&mut self) -> Result<()> {
debug!("[ShardRunner {:?}] Running", self.shard_info);

loop {
{
let mut shard = self.shard.lock();
Expand Down Expand Up @@ -146,15 +153,24 @@ impl<H: EventHandler + 'static> ShardRunner<H> {
break;
}

debug!("Attempting to auto-reconnect");
debug!("[ShardRunner {:?}] Attempting to auto-reconnect",
self.shard_info);

if let Err(why) = shard.autoreconnect() {
error!("Failed to auto-reconnect: {:?}", why);
error!(
"[ShardRunner {:?}] Failed to auto-reconnect: {:?}",
self.shard_info,
why,
);
}

break;
},
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => break,
Err(Error::WebSocket(WebSocketError::NoDataAvailable)) => {
// This is hit when the websocket client dies this will be
// hit every iteration.
break;
},
other => other,
};

Expand Down
137 changes: 80 additions & 57 deletions src/gateway/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ impl Shard {

match event {
Event::Ready(ref ready) => {
debug!("[Shard {:?}] Received Ready", self.shard_info);

self.session_id = Some(ready.ready.session_id.clone());
self.stage = ConnectionStage::Connected;

Expand Down Expand Up @@ -403,11 +405,15 @@ impl Shard {
self.heartbeat_instants.1 = Some(Instant::now());
self.last_heartbeat_acknowledged = true;

trace!("[Shard {}] Received heartbeat ack", self.shard_info[0]);
trace!("[Shard {:?}] Received heartbeat ack", self.shard_info);

Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
debug!("[Shard {:?}] Received a Hello; interval: {}",
self.shard_info,
interval);

if interval > 0 {
self.heartbeat_interval = Some(interval);
}
Expand All @@ -417,6 +423,9 @@ impl Shard {

Ok(None)
} else {
debug!("[Shard {:?}] Received late Hello; autoreconnecting",
self.shard_info);

self.autoreconnect().and(Ok(None))
}
},
Expand Down Expand Up @@ -452,10 +461,17 @@ impl Shard {
}

match num {
Some(close_codes::UNKNOWN_OPCODE) => warn!("Sent invalid opcode"),
Some(close_codes::DECODE_ERROR) => warn!("Sent invalid message"),
Some(close_codes::UNKNOWN_OPCODE) => {
warn!("[Shard {:?}] Sent invalid opcode",
self.shard_info);
},
Some(close_codes::DECODE_ERROR) => {
warn!("[Shard {:?}] Sent invalid message",
self.shard_info);
},
Some(close_codes::NOT_AUTHENTICATED) => {
warn!("Sent no authentication");
warn!("[Shard {:?}] Sent no authentication",
self.shard_info);

return Err(Error::Gateway(GatewayError::NoAuthentication));
},
Expand All @@ -464,24 +480,30 @@ impl Shard {

return Err(Error::Gateway(GatewayError::InvalidAuthentication));
},
Some(close_codes::ALREADY_AUTHENTICATED) => warn!("Already authenticated"),
Some(close_codes::ALREADY_AUTHENTICATED) => {
warn!("[Shard {:?}] Already authenticated",
self.shard_info);
},
Some(close_codes::INVALID_SEQUENCE) => {
warn!(
"[Shard {:?}] Sent invalid seq: {}",
self.shard_info,
self.seq
);
warn!("[Shard {:?}] Sent invalid seq: {}",
self.shard_info,
self.seq);

self.seq = 0;
},
Some(close_codes::RATE_LIMITED) => warn!("Gateway ratelimited"),
Some(close_codes::RATE_LIMITED) => {
warn!("[Shard {:?}] Gateway ratelimited",
self.shard_info);
},
Some(close_codes::INVALID_SHARD) => {
warn!("Sent invalid shard data");
warn!("[Shard {:?}] Sent invalid shard data",
self.shard_info);

return Err(Error::Gateway(GatewayError::InvalidShardData));
},
Some(close_codes::SHARDING_REQUIRED) => {
error!("Shard has too many guilds");
error!("[Shard {:?}] Shard has too many guilds",
self.shard_info);

return Err(Error::Gateway(GatewayError::OverloadedShard));
},
Expand Down Expand Up @@ -519,11 +541,11 @@ impl Shard {
}
}

warn!("[Shard {:?}] Websocket error: {:?}", self.shard_info, why);
info!(
"[Shard {:?}] Will attempt to auto-reconnect",
self.shard_info
);
warn!("[Shard {:?}] Websocket error: {:?}",
self.shard_info,
why);
info!("[Shard {:?}] Will attempt to auto-reconnect",
self.shard_info);

self.autoreconnect().and(Ok(None))
},
Expand Down Expand Up @@ -661,6 +683,8 @@ impl Shard {
/// [`Guild`]: ../../model/struct.Guild.html
/// [`Member`]: ../../model/struct.Member.html
pub fn chunk_guilds(&mut self, guild_ids: &[GuildId], limit: Option<u16>, query: Option<&str>) {
debug!("[Shard {:?}] Requesting member chunks", self.shard_info);

let msg = json!({
"op": OpCode::GetGuildMembers.num(),
"d": {
Expand Down Expand Up @@ -738,11 +762,9 @@ impl Shard {
pub(crate) fn cycle_voice_recv(&mut self) {
if let Ok(v) = self.manager_rx.try_recv() {
if let Err(why) = self.client.send_json(&v) {
warn!(
"[Shard {:?}] Err sending voice msg: {:?}",
self.shard_info,
why
);
warn!("[Shard {:?}] Err sending voice msg: {:?}",
self.shard_info,
why);
}
}
}
Expand All @@ -753,36 +775,31 @@ impl Shard {
"op": OpCode::Heartbeat.num(),
});

trace!(
"[Shard {:?}] Sending heartbeat d: {}",
self.shard_info,
self.seq
);
trace!("[Shard {:?}] Sending heartbeat d: {}",
self.shard_info,
self.seq);

match self.client.send_json(&map) {
Ok(_) => {
self.heartbeat_instants.0 = Some(Instant::now());
self.last_heartbeat_acknowledged = false;

trace!("[{:02}] successfully heartbeated", self.shard_info[0]);
trace!("[Shard {:?}] Successfully heartbeated",
self.shard_info);

Ok(())
},
Err(why) => {
match why {
Error::WebSocket(WebSocketError::IoError(err)) => if err.raw_os_error() != Some(32) {
debug!(
"[Shard {:?}] Err w/ heartbeating: {:?}",
self.shard_info,
err
);
debug!("[Shard {:?}] Err heartbeating: {:?}",
self.shard_info,
err);
},
other => {
warn!(
"[Shard {:?}] Other err w/ keepalive: {:?}",
self.shard_info,
other
);
warn!("[Shard {:?}] Other err w/ keepalive: {:?}",
self.shard_info,
other);
},
}

Expand Down Expand Up @@ -812,29 +829,29 @@ impl Shard {
if !self.last_heartbeat_acknowledged {
debug!(
"[Shard {:?}] Last heartbeat not acknowledged; re-connecting",
self.shard_info
self.shard_info,
);

return self.reconnect().map_err(|why| {
warn!(
"[Shard {:?}] Err auto-reconnecting from heartbeat check: {:?}",
self.shard_info,
why
why,
);

why
});
}

// Otherwise, we're good to heartbeat.
trace!("[{:02}] heartbeating", self.shard_info[0]);
trace!("[Shard {:?}] Heartbeating", self.shard_info);

if let Err(why) = self.heartbeat() {
warn!("[Shard {:?}] Err heartbeating: {:?}", self.shard_info, why);

self.reconnect()
} else {
trace!("[{:02}] heartbeated", self.shard_info[0]);
trace!("[Shard {:?}] Heartbeated", self.shard_info);
self.heartbeat_instants.0 = Some(Instant::now());

Ok(())
Expand All @@ -847,17 +864,13 @@ impl Shard {
}

if self.session_id.is_some() {
debug!(
"[Shard {:?}] Autoreconnector choosing to resume",
self.shard_info
);
debug!("[Shard {:?}] Autoreconnector choosing to resume",
self.shard_info);

self.resume()
} else {
debug!(
"[Shard {:?}] Autoreconnector choosing to reconnect",
self.shard_info
);
debug!("[Shard {:?}] Autoreconnector choosing to reconnect",
self.shard_info);

self.reconnect()
}
Expand Down Expand Up @@ -887,8 +900,12 @@ impl Shard {
//
// [`reconnect`]: #method.reconnect
fn resume(&mut self) -> Result<()> {
debug!("Shard {:?}] Attempting to resume", self.shard_info);

self.send_resume().or_else(|why| {
warn!("Err sending resume: {:?}", why);
warn!("[Shard {:?}] Err sending resume: {:?}",
self.shard_info,
why);

self.reconnect()
})
Expand All @@ -900,6 +917,10 @@ impl Shard {
None => return Err(Error::Gateway(GatewayError::NoSessionId)),
};

debug!("[Shard {:?}] Sending resume; seq: {}",
self.shard_info,
self.seq);

self.client.send_json(&json!({
"op": OpCode::Resume.num(),
"d": {
Expand Down Expand Up @@ -936,6 +957,8 @@ impl Shard {

self.heartbeat_instants.0 = Some(Instant::now());

debug!("[Shard {:?}] Identifying", self.shard_info);

self.client.send_json(&identification)
}

Expand Down Expand Up @@ -965,12 +988,12 @@ impl Shard {
},
});

debug!("[Shard {:?}] Sending presence update", self.shard_info);

if let Err(why) = self.client.send_json(&msg) {
warn!(
"[Shard {:?}] Err sending presence update: {:?}",
self.shard_info,
why
);
warn!("[Shard {:?}] Err sending presence update: {:?}",
self.shard_info,
why);
}

#[cfg(feature = "cache")]
Expand Down

0 comments on commit 21e194b

Please sign in to comment.