Skip to content

Commit

Permalink
Report connection stats back from the server
Browse files Browse the repository at this point in the history
- Adds ClosedownReport message to control channel
  • Loading branch information
crazyscot committed Oct 25, 2024
1 parent bdd0a7b commit cf91b96
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 27 deletions.
10 changes: 10 additions & 0 deletions schema/control.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ struct ServerMessage {
warning @3: Text; # If present, a warning message to be relayed to a human
bandwidthInfo @4: Text; # Reports the server's active bandwidth configuration
}

struct ClosedownReport {
finalCongestionWindow @0: UInt64;
sentPackets @1: UInt64;
lostPackets @2: UInt64;
lostBytes @3: UInt64;
congestionEvents @4: UInt64;
blackHoles @5: UInt64;
sentBytes @6: UInt64;
}
14 changes: 11 additions & 3 deletions src/client/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

use std::{net::IpAddr, process::Stdio, time::Duration};

use anyhow::{Context as _, Result};
use anyhow::{anyhow, Context as _, Result};
use tokio::{io::AsyncReadExt as _, time::timeout};
use tracing::{debug, trace};

use crate::{
cert::Credentials,
cli::CliArgs,
protocol::control::{ClientMessage, ServerMessage, BANNER},
protocol::control::{ClientMessage, ClosedownReport, ServerMessage, BANNER},
};

/// The parameter set needed to set up the control channel
Expand Down Expand Up @@ -62,7 +62,6 @@ impl ControlChannel {
credentials: &Credentials,
server_address: IpAddr,
) -> Result<(ControlChannel, ServerMessage)> {
use anyhow::anyhow;
debug!("opening control channel");
let mut new1 = Self::launch(parameters)?;
new1.wait_for_banner(parameters.timeout).await?;
Expand Down Expand Up @@ -131,4 +130,13 @@ impl ControlChannel {
anyhow::ensure!(BANNER == read_banner, "server banner not as expected");
Ok(())
}

pub(crate) async fn read_closedown_report(&mut self) -> Result<ClosedownReport> {
let pipe = self
.process
.stdout
.as_mut()
.ok_or(anyhow!("could not access process stdout (can't happen?)"))?;
ClosedownReport::read(pipe).await
}
}
4 changes: 4 additions & 0 deletions src/client/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any
spinner.set_message("Shutting down");
// Forcibly (but gracefully) tear down QUIC. All the requests have completed or errored.
endpoint.close(1u8.into(), "finished".as_bytes());
let remote_stats = control.read_closedown_report().await?;
debug!("remote reported stats: {:?}", remote_stats);

let control_fut = control.close();
let _ = timeout(args.timeout, endpoint.wait_idle())
.await
Expand All @@ -132,6 +135,7 @@ pub(crate) async fn client_main(args: &CliArgs, progress: &MultiProgress) -> any
&connection2.stats(),
total_bytes,
transport_time,
remote_stats,
);
}

Expand Down
102 changes: 100 additions & 2 deletions src/protocol/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@
* The two processes are usually connected by stdio, via ssh.
*
* The protocol looks like this:
* (Client creates remote server process)
* (Client creates remote process, which we call the Server)
* Server -> Client: Banner
* C -> S: `ClientMessage`
* S -> C: `ServerMessage`
* The client then establishes a QUIC connection to the server, on the port given in the `ServerMessage`.
* The client then opens one or more bidirectional QUIC streams ('sessions') on that connection.
* See the session protocol for what happens there.
* When all streams are finished:
* C -> S: `Closedown`
* S -> C: `ClosedownReport`
* C -> S: (closes control channel; server process exits)
*
* On the wire the Client and Server messages are sent using capnproto with standard framing.
* On the wire these messages are sent using standard capnproto framing.
*/

use crate::util::AddressFamily;

use anyhow::Result;
use capnp::message::ReaderOptions;
use quinn::ConnectionStats;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};

/// Low-level protocol structures and serialisation, autogenerated from session.capnp
Expand Down Expand Up @@ -171,6 +176,99 @@ impl ServerMessage {
}
}

/// Helper struct (currently empty, but with methods) for capnp `Closedown`
#[derive(Clone, Copy, Debug)]
pub struct Closedown {}

impl Closedown {
/// Serializer
pub async fn write<W>(write: &mut W) -> Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
let msg = ::capnp::message::Builder::new_default();
capnp_futures::serialize::write_message(write.compat_write(), &msg).await?;
Ok(())
}

/// Deserializer
pub async fn read<R>(read: &mut R) -> anyhow::Result<Self>
where
R: tokio::io::AsyncRead + Unpin,
{
let _reader =
capnp_futures::serialize::read_message(read.compat(), ReaderOptions::new()).await?;
Ok(Self {})
}
}

/// Helper struct for capnp `ClosedownReport`
#[derive(Clone, Copy, Debug)]
pub struct ClosedownReport {
/// Final congestion window
pub cwnd: u64,
/// Sent packet count
pub sent_packets: u64,
/// Send byte count
pub sent_bytes: u64,
/// Lost packet count
pub lost_packets: u64,
/// Lost packet total payload
pub lost_bytes: u64,
/// Number of congestion events detected
pub congestion_events: u64,
/// Number of black hole events detected
pub black_holes_detected: u64,
}

impl ClosedownReport {
/// Serializer
pub async fn write<W>(write: &mut W, stats: &ConnectionStats) -> Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
let ps = &stats.path;
let mut msg = ::capnp::message::Builder::new_default();
let mut builder = msg.init_root::<control_capnp::closedown_report::Builder<'_>>();
builder.set_final_congestion_window(ps.cwnd);
builder.set_sent_packets(ps.sent_packets);
builder.set_sent_bytes(stats.udp_tx.bytes);
builder.set_lost_packets(ps.lost_packets);
builder.set_lost_bytes(ps.lost_bytes);
builder.set_congestion_events(ps.congestion_events);
builder.set_black_holes(ps.black_holes_detected);
capnp_futures::serialize::write_message(write.compat_write(), &msg).await?;
Ok(())
}

/// Deserializer
pub async fn read<R>(read: &mut R) -> anyhow::Result<Self>
where
R: tokio::io::AsyncRead + Unpin,
{
let reader =
capnp_futures::serialize::read_message(read.compat(), ReaderOptions::new()).await?;
let msg_reader: control_capnp::closedown_report::Reader<'_> = reader.get_root()?;
let cwnd = msg_reader.get_final_congestion_window();
let sent_packets = msg_reader.get_sent_packets();
let sent_bytes = msg_reader.get_sent_bytes();
let lost_packets = msg_reader.get_lost_packets();
let lost_bytes = msg_reader.get_lost_bytes();
let congestion_events = msg_reader.get_congestion_events();
let black_holes_detected = msg_reader.get_black_holes();

Ok(Self {
cwnd,
sent_packets,
sent_bytes,
lost_packets,
lost_bytes,
congestion_events,
black_holes_detected,
})
}
}

#[cfg(test)]
mod tests {

Expand Down
25 changes: 19 additions & 6 deletions src/server/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

use crate::cert::Credentials;
use crate::cli::CliArgs;
use crate::protocol::control::{ClientMessage, ServerMessage};
use crate::protocol::control::{ClientMessage, ClosedownReport, ServerMessage};
use crate::protocol::session::{session_capnp::Status, Command, FileHeader, FileTrailer, Response};
use crate::protocol::{self, StreamPair};
use crate::transport::{BandwidthConfig, BandwidthParams};
Expand All @@ -17,9 +17,10 @@ use anyhow::Context as _;
use quinn::crypto::rustls::QuicServerConfig;
use quinn::rustls::server::WebPkiClientVerifier;
use quinn::rustls::{self, RootCertStore};
use quinn::EndpointConfig;
use quinn::{ConnectionStats, EndpointConfig};
use rustls_pki_types::CertificateDer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, BufReader};
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::{debug, error, info, trace, trace_span, warn, Instrument};
Expand Down Expand Up @@ -75,13 +76,20 @@ pub(crate) async fn server_main(args: &CliArgs) -> anyhow::Result<()> {
// We have tight control over what we expect (TLS peer certificate/name) so only need to handle one successful connection,
// but a timeout is useful to give the user a cue that UDP isn't getting there.
trace!("waiting for QUIC");
let (stats_tx, mut stats_rx) = oneshot::channel();
if let Some(conn) = timeout(args.timeout, endpoint.accept())
.await
.with_context(|| "Timed out waiting for QUIC connection")?
{
let _ = tasks.spawn(async move {
if let Err(e) = handle_connection(conn, file_buffer_size).await {
error!("inward stream failed: {reason}", reason = e.to_string());
let result = handle_connection(conn, file_buffer_size).await;
match result {
Err(e) => error!("inward stream failed: {reason}", reason = e.to_string()),
Ok(conn_stats) => {
let _ = stats_tx.send(conn_stats).inspect_err(|_| {
warn!("unable to pass connection stats; possible logic error");
});
}
}
trace!("connection completed");
});
Expand All @@ -94,6 +102,8 @@ pub(crate) async fn server_main(args: &CliArgs) -> anyhow::Result<()> {
let _ = tasks.join_all().await;
endpoint.close(1u8.into(), "finished".as_bytes());
endpoint.wait_idle().await;
let stats = stats_rx.try_recv().unwrap_or_default();
ClosedownReport::write(&mut stdout, &stats).await?;
trace!("finished");
Ok(())
}
Expand Down Expand Up @@ -152,7 +162,10 @@ fn create_endpoint(
))
}

async fn handle_connection(conn: quinn::Incoming, file_buffer_size: usize) -> anyhow::Result<()> {
async fn handle_connection(
conn: quinn::Incoming,
file_buffer_size: usize,
) -> anyhow::Result<ConnectionStats> {
let connection = conn.await?;
info!("accepted connection from {}", connection.remote_address());

Expand Down Expand Up @@ -184,7 +197,7 @@ async fn handle_connection(conn: quinn::Incoming, file_buffer_size: usize) -> an
}
}
.await?;
Ok(())
Ok(connection.stats())
}

async fn handle_stream(mut sp: StreamPair, file_buffer_size: usize) -> anyhow::Result<()> {
Expand Down
50 changes: 34 additions & 16 deletions src/util/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use human_repr::{HumanCount, HumanDuration, HumanThroughput};
use quinn::ConnectionStats;
use std::{fmt::Display, time::Duration};
use std::{cmp, fmt::Display, time::Duration};
use tracing::{info, warn};

use crate::cli::CliArgs;
use crate::{cli::CliArgs, protocol::control::ClosedownReport};

/// Human friendly output helper
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -54,6 +54,7 @@ pub(crate) fn output_statistics(
stats: &ConnectionStats,
payload_bytes: u64,
transport_time: Option<Duration>,
remote_stats: ClosedownReport,
) {
if payload_bytes != 0 {
let size = payload_bytes.human_count_bytes();
Expand All @@ -62,14 +63,18 @@ pub(crate) fn output_statistics(
transport_time.map_or("unknown".to_string(), |d| d.human_duration().to_string());
info!("Transferred {size} in {transport_time_str}; average {rate}");
}
if stats.path.congestion_events > 0 {
warn!(
"Congestion events: {}",
stats.path.congestion_events.human_count_bare()
if args.statistics {
info!(
"Total packets sent: {} by us, {} by remote",
stats.path.sent_packets, remote_stats.sent_packets
);
}
if args.statistics {
info!("Sent packets: {}", stats.path.sent_packets);
let congestion = stats.path.congestion_events + remote_stats.congestion_events;
if congestion > 0 {
warn!(
"Congestion events detected: {}",
congestion.human_count_bare()
);
}
if stats.path.lost_packets > 0 {
#[allow(clippy::cast_precision_loss)]
Expand All @@ -81,33 +86,46 @@ pub(crate) fn output_statistics(
bytes = stats.path.lost_bytes.human_count_bytes(),
);
}
if remote_stats.lost_packets > 0 {
#[allow(clippy::cast_precision_loss)]
let pct = 100. * remote_stats.lost_packets as f64 / remote_stats.sent_packets as f64;
warn!(
"Remote lost packets: {count}/{total} ({pct:.2}%, for {bytes})",
count = remote_stats.lost_packets.human_count_bare(),
total = remote_stats.sent_packets,
bytes = remote_stats.lost_bytes.human_count_bytes(),
);
}

let total_bytes = stats.udp_tx.bytes + stats.udp_rx.bytes;
let sender_sent_bytes = cmp::max(stats.udp_tx.bytes, remote_stats.sent_bytes);
if args.statistics {
let cwnd = cmp::max(stats.path.cwnd, remote_stats.cwnd);
info!(
"Path MTU {pmtu}, round-trip time {rtt}",
"Path MTU {pmtu}, round-trip time {rtt}, final congestion window {cwnd}",
pmtu = stats.path.current_mtu,
rtt = stats.path.rtt.human_duration(),
);
let black_holes = stats.path.black_holes_detected + remote_stats.black_holes_detected;
info!(
"{tx} datagrams sent, {rx} received, {bhd} black holes detected",
"{tx} datagrams sent, {rx} received, {black_holes} black holes detected",
tx = stats.udp_tx.datagrams.human_count_bare(),
rx = stats.udp_rx.datagrams.human_count_bare(),
bhd = stats.path.black_holes_detected,
);
if payload_bytes != 0 {
#[allow(clippy::cast_precision_loss)]
let overhead_pct = 100. * (total_bytes - payload_bytes) as f64 / payload_bytes as f64;
let overhead_pct =
100. * (sender_sent_bytes - payload_bytes) as f64 / payload_bytes as f64;
info!(
"{} total bytes transferred for {} bytes payload ({:.2}% overhead)",
total_bytes, payload_bytes, overhead_pct
"{} total bytes sent for {} bytes payload ({:.2}% overhead/loss)",
sender_sent_bytes, payload_bytes, overhead_pct
);
}
}
if stats.path.rtt.as_millis() > args.rtt.into() {
warn!(
"Measured path RTT {rtt_measured:?} was greater than configuration; for better performance, next time try --rtt {rtt_param}",
"Measured path RTT {rtt_measured:?} was greater than configuration {rtt_arg}; for better performance, next time try --rtt {rtt_param}",
rtt_measured = stats.path.rtt,
rtt_arg = args.rtt,
rtt_param = stats.path.rtt.as_millis()+1, // round up
);
}
Expand Down

0 comments on commit cf91b96

Please sign in to comment.