Skip to content

Commit

Permalink
Add shard latency tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Austin Hellyer committed Jan 14, 2017
1 parent 0a2f5ab commit 096b0f5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
17 changes: 12 additions & 5 deletions src/client/gateway/prep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use serde_json::builder::ObjectBuilder;
use serde_json::Value;
use std::net::Shutdown;
use std::sync::mpsc::{
TryRecvError,
Receiver as MpscReceiver,
Sender as MpscSender
Sender as MpscSender,
TryRecvError,
};
use std::time::Duration as StdDuration;
use std::sync::{Arc, Mutex};
use std::time::{Duration as StdDuration, Instant};
use std::{env, thread};
use super::super::ClientError;
use super::{GatewayError, GatewayStatus};
Expand Down Expand Up @@ -88,6 +89,7 @@ pub fn build_gateway_url(base: &str) -> Result<RequestUrl> {
}

pub fn keepalive(interval: u64,
heartbeat_sent: Arc<Mutex<Instant>>,
mut sender: Sender<WebSocketStream>,
channel: MpscReceiver<GatewayStatus>) {
let mut base_interval = Duration::milliseconds(interval as i64);
Expand Down Expand Up @@ -129,8 +131,13 @@ pub fn keepalive(interval: u64,

trace!("Sending heartbeat d: {}", last_sequence);

if let Err(why) = sender.send_json(&map) {
warn!("Error sending keepalive: {:?}", why);
match sender.send_json(&map) {
Ok(_) => {
let now = Instant::now();

*heartbeat_sent.lock().unwrap() = now;
},
Err(why) => warn!("Error sending keepalive: {:?}", why),
}
}
}
Expand Down
28 changes: 26 additions & 2 deletions src/client/gateway/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use serde_json::builder::ObjectBuilder;
use std::io::Write;
use std::net::Shutdown;
use std::sync::mpsc::{self, Sender as MpscSender};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder};
use std::time::Duration as StdDuration;
use std::time::{Duration as StdDuration, Instant};
use std::mem;
use super::super::login_type::LoginType;
use super::super::rest;
Expand Down Expand Up @@ -63,6 +64,13 @@ type CurrentPresence = (Option<Game>, OnlineStatus, bool);
/// [module docs]: index.html#sharding
pub struct Shard {
current_presence: CurrentPresence,
/// A tuple of the last instant that a heartbeat was sent, and the last that
/// an acknowledgement was received.
///
/// This can be used to calculate [`latency`].
///
/// [`latency`]: fn.latency.html
heartbeat_instants: (Arc<Mutex<Instant>>, Option<Instant>),
keepalive_channel: MpscSender<GatewayStatus>,
seq: u64,
login_type: LoginType,
Expand Down Expand Up @@ -131,9 +139,15 @@ impl Shard {
info[1] - 1),
None => "serenity keepalive [unsharded]".to_owned(),
};

let heartbeat_sent = Arc::new(Mutex::new(Instant::now()));
let heartbeat_clone = heartbeat_sent.clone();

ThreadBuilder::new()
.name(thread_name)
.spawn(move || prep::keepalive(heartbeat_interval, sender, rx))?;
.spawn(move || {
prep::keepalive(heartbeat_interval, heartbeat_clone, sender, rx)
})?;

// Parse READY
let event = receiver.recv_json(GatewayEvent::decode)?;
Expand All @@ -145,6 +159,7 @@ impl Shard {
Ok((feature_voice! {{
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
keepalive_channel: tx.clone(),
seq: sequence,
login_type: login_type,
Expand All @@ -157,6 +172,7 @@ impl Shard {
} else {
Shard {
current_presence: (None, OnlineStatus::Online, false),
heartbeat_instants: (heartbeat_sent, None),
keepalive_channel: tx.clone(),
seq: sequence,
login_type: login_type,
Expand Down Expand Up @@ -303,6 +319,8 @@ impl Shard {
Ok(None)
},
Ok(GatewayEvent::HeartbeatAck) => {
self.heartbeat_instants.1 = Some(Instant::now());

Ok(None)
},
Ok(GatewayEvent::Hello(interval)) => {
Expand Down Expand Up @@ -426,6 +444,12 @@ impl Shard {
}
}

/// Calculates the heartbeat latency (in nanoseconds) between the shard and
/// Discord.
pub fn latency(&self) -> Option<StdDuration> {
self.heartbeat_instants.1.map(|send| send - *self.heartbeat_instants.0.lock().unwrap())
}

/// Shuts down the receiver by attempting to cleanly close the
/// connection.
#[doc(hidden)]
Expand Down

0 comments on commit 096b0f5

Please sign in to comment.