Skip to content

Commit

Permalink
g3bench: add quic speed limit and io stats to h3 target
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Nov 3, 2023
1 parent 94c04d3 commit 044957a
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 59 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions g3bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ Benchmark tool for HTTP 1.x / HTTP 2 / HTTP 3 / TLS Handshake / DNS / Cloudflare
* GET / HEAD
* Socks5 proxy / Http Proxy / Https Proxy
* PROXY Protocol
* Socket Speed limit
* Socket Speed limit and IO stats (HTTP layer)

- *HTTP 2*

* GET / HEAD
* Socks5 proxy / Http Proxy / Https Proxy
* Connection Pool
* PROXY Protocol
* Socket Speed limit
* Socket Speed limit and IO stats (H2 layer)

- *HTTP 3*

* GET / HEAD
* Connection Pool
* Socket Speed limit and IO stats (QUIC layer)

- *TLS Handshake*

Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn run(proc_args: &Arc<ProcArgs>, cmd_args: &ArgMatches) -> anyhow::Re
let target = HttpTarget {
args: Arc::new(http_args),
proc_args: Arc::clone(proc_args),
stats: Arc::new(HttpRuntimeStats::new(COMMAND)),
stats: Arc::new(HttpRuntimeStats::new_tcp(COMMAND)),
histogram: Some(histogram),
histogram_recorder,
};
Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn run(proc_args: &Arc<ProcArgs>, cmd_args: &ArgMatches) -> anyhow::Re
h2_args.resolve_target_address(proc_args).await?;
let h2_args = Arc::new(h2_args);

let runtime_stats = Arc::new(HttpRuntimeStats::new(COMMAND));
let runtime_stats = Arc::new(HttpRuntimeStats::new_tcp(COMMAND));
let (histogram, histogram_recorder) = HttpHistogram::new();

let pool = h2_args.pool_size.map(|s| {
Expand Down
2 changes: 1 addition & 1 deletion g3bench/src/target/h3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn run(proc_args: &Arc<ProcArgs>, cmd_args: &ArgMatches) -> anyhow::Re
h3_args.resolve_target_address(proc_args).await?;
let h3_args = Arc::new(h3_args);

let runtime_stats = Arc::new(HttpRuntimeStats::new(COMMAND));
let runtime_stats = Arc::new(HttpRuntimeStats::new_udp(COMMAND));
let (histogram, histogram_recorder) = HttpHistogram::new();

let pool = h3_args.pool_size.map(|s| {
Expand Down
46 changes: 35 additions & 11 deletions g3bench/src/target/h3/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/

use std::borrow::Cow;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use bytes::Bytes;
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use g3_io_ext::LimitedTokioRuntime;
use h3::client::SendRequest;
use h3_quinn::OpenStreams;
use http::{HeaderValue, Method, StatusCode};
use quinn::Endpoint;
use url::Url;

use g3_types::collection::{SelectiveVec, WeightedValue};
Expand Down Expand Up @@ -96,19 +98,41 @@ impl BenchH3Args {
Ok(())
}

fn new_quic_endpoint(
&self,
stats: &Arc<HttpRuntimeStats>,
proc_args: &ProcArgs,
) -> anyhow::Result<Endpoint> {
let peer = *proc_args.select_peer(&self.peer_addrs);
let socket = g3_socket::udp::new_std_socket_to(
peer,
self.bind,
Default::default(),
&Default::default(),
)
.map_err(|e| anyhow!("failed to setup local udp socket: {e}"))?;

let limit = &proc_args.udp_sock_speed_limit;
let runtime = LimitedTokioRuntime::new(
limit.shift_millis,
limit.max_north_packets,
limit.max_north_bytes,
limit.max_south_packets,
limit.max_south_bytes,
stats.clone(),
);
Endpoint::new(Default::default(), None, socket, Arc::new(runtime))
.map_err(|e| anyhow!("failed to create quic endpoint: {e}"))
}

async fn new_quic_connection(
&self,
stats: &Arc<HttpRuntimeStats>,
proc_args: &ProcArgs,
) -> anyhow::Result<h3_quinn::Connection> {
use quinn::{ClientConfig, Endpoint, TransportConfig, VarInt};
use quinn::{ClientConfig, TransportConfig, VarInt};

let bind_addr = if let Some(ip) = self.bind {
SocketAddr::new(ip, 0)
} else {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
};
let endpoint = Endpoint::client(bind_addr)
.map_err(|e| anyhow!("failed to create quic endpoint: {e}"))?;
let endpoint = self.new_quic_endpoint(stats, proc_args)?;

let Some(tls_client) = &self.target_tls.client else {
unreachable!()
Expand Down Expand Up @@ -140,10 +164,10 @@ impl BenchH3Args {

pub(super) async fn new_h3_connection(
&self,
_stats: &Arc<HttpRuntimeStats>,
stats: &Arc<HttpRuntimeStats>,
proc_args: &ProcArgs,
) -> anyhow::Result<SendRequest<OpenStreams, Bytes>> {
let quic_conn = self.new_quic_connection(proc_args).await?;
let quic_conn = self.new_quic_connection(stats, proc_args).await?;

let mut client_builder = h3::client::builder();
// TODO add more client config
Expand Down
158 changes: 133 additions & 25 deletions g3bench/src/target/http/stats/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,35 @@ use std::time::Duration;

use cadence::{Counted, Gauged, StatsdClient};

use g3_io_ext::{LimitedReaderStats, LimitedWriterStats};
use g3_io_ext::{LimitedReaderStats, LimitedRecvStats, LimitedSendStats, LimitedWriterStats};

use crate::target::BenchRuntimeStats;

#[derive(Default)]
struct HttpUdpIoStats {
send_bytes: AtomicU64,
send_packets: AtomicU64,
send_bytes_total: AtomicU64,
send_packets_total: AtomicU64,
recv_bytes: AtomicU64,
recv_packets: AtomicU64,
recv_bytes_total: AtomicU64,
recv_packets_total: AtomicU64,
}

#[derive(Default)]
struct HttpTcpIoStats {
read: AtomicU64,
write: AtomicU64,
read_total: AtomicU64,
write_total: AtomicU64,
}

enum HttpIoStats {
Tcp(HttpTcpIoStats),
Udp(HttpUdpIoStats),
}

pub(crate) struct HttpRuntimeStats {
target: &'static str,
task_total: AtomicU64,
Expand All @@ -36,14 +61,19 @@ pub(crate) struct HttpRuntimeStats {
conn_close_error: AtomicU64,
conn_close_timeout: AtomicU64,

tcp_read: AtomicU64,
tcp_write: AtomicU64,
tcp_read_total: AtomicU64,
tcp_write_total: AtomicU64,
io: HttpIoStats,
}

impl HttpRuntimeStats {
pub(crate) fn new(target: &'static str) -> Self {
pub(crate) fn new_tcp(target: &'static str) -> Self {
HttpRuntimeStats::with_io(target, HttpIoStats::Tcp(HttpTcpIoStats::default()))
}

pub(crate) fn new_udp(target: &'static str) -> Self {
HttpRuntimeStats::with_io(target, HttpIoStats::Udp(HttpUdpIoStats::default()))
}

fn with_io(target: &'static str, io: HttpIoStats) -> Self {
HttpRuntimeStats {
target,
task_total: AtomicU64::new(0),
Expand All @@ -56,11 +86,7 @@ impl HttpRuntimeStats {
conn_success_total: AtomicU64::new(0),
conn_close_error: AtomicU64::new(0),
conn_close_timeout: AtomicU64::new(0),

tcp_read: AtomicU64::new(0),
tcp_write: AtomicU64::new(0),
tcp_read_total: AtomicU64::new(0),
tcp_write_total: AtomicU64::new(0),
io,
}
}

Expand Down Expand Up @@ -103,13 +129,45 @@ impl HttpRuntimeStats {

impl LimitedReaderStats for HttpRuntimeStats {
fn add_read_bytes(&self, size: usize) {
self.tcp_read.fetch_add(size as u64, Ordering::Relaxed);
if let HttpIoStats::Tcp(tcp) = &self.io {
tcp.read.fetch_add(size as u64, Ordering::Relaxed);
}
}
}

impl LimitedWriterStats for HttpRuntimeStats {
fn add_write_bytes(&self, size: usize) {
self.tcp_write.fetch_add(size as u64, Ordering::Relaxed);
if let HttpIoStats::Tcp(tcp) = &self.io {
tcp.write.fetch_add(size as u64, Ordering::Relaxed);
}
}
}

impl LimitedSendStats for HttpRuntimeStats {
fn add_send_bytes(&self, size: usize) {
if let HttpIoStats::Udp(udp) = &self.io {
udp.send_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
}

fn add_send_packets(&self, n: usize) {
if let HttpIoStats::Udp(udp) = &self.io {
udp.send_packets.fetch_add(n as u64, Ordering::Relaxed);
}
}
}

impl LimitedRecvStats for HttpRuntimeStats {
fn add_recv_bytes(&self, size: usize) {
if let HttpIoStats::Udp(udp) = &self.io {
udp.recv_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
}

fn add_recv_packets(&self, n: usize) {
if let HttpIoStats::Udp(udp) = &self.io {
udp.recv_packets.fetch_add(n as u64, Ordering::Relaxed);
}
}
}

Expand Down Expand Up @@ -143,10 +201,40 @@ impl BenchRuntimeStats for HttpRuntimeStats {
emit_count!(conn_success, "connection.success");
self.conn_success_total
.fetch_add(conn_success, Ordering::Relaxed);
emit_count!(tcp_write, "io.tcp.write");
self.tcp_write_total.fetch_add(tcp_write, Ordering::Relaxed);
emit_count!(tcp_read, "io.tcp.read");
self.tcp_read_total.fetch_add(tcp_read, Ordering::Relaxed);

macro_rules! emit_io_count {
($obj:ident, $field:ident, $name:literal) => {
let $field = $obj.$field.swap(0, Ordering::Relaxed);
let v = i64::try_from($field).unwrap_or(i64::MAX);
client
.count_with_tags(concat!("http.", $name), v)
.with_tag(TAG_NAME_TARGET, self.target)
.send();
};
}

match &self.io {
HttpIoStats::Tcp(tcp) => {
emit_io_count!(tcp, write, "io.tcp.write");
tcp.write_total.fetch_add(write, Ordering::Relaxed);
emit_io_count!(tcp, read, "io.tcp.read");
tcp.read_total.fetch_add(read, Ordering::Relaxed);
}
HttpIoStats::Udp(udp) => {
emit_io_count!(udp, send_bytes, "io.udp.send_bytes");
udp.send_bytes_total
.fetch_add(send_bytes, Ordering::Relaxed);
emit_io_count!(udp, send_packets, "io.udp.send_packets");
udp.send_packets_total
.fetch_add(send_packets, Ordering::Relaxed);
emit_io_count!(udp, recv_bytes, "io.udp.recv_bytes");
udp.recv_bytes_total
.fetch_add(recv_bytes, Ordering::Relaxed);
emit_io_count!(udp, recv_packets, "io.udp.recv_packets");
udp.recv_packets_total
.fetch_add(recv_packets, Ordering::Relaxed);
}
}
}

fn summary(&self, total_time: Duration) {
Expand Down Expand Up @@ -174,13 +262,33 @@ impl BenchRuntimeStats for HttpRuntimeStats {
}

println!("# Traffic");
let total_send =
self.tcp_write_total.load(Ordering::Relaxed) + self.tcp_write.load(Ordering::Relaxed);
println!("Send bytes: {total_send}");
println!("Send rate: {:.3}B/s", total_send as f64 / total_secs);
let total_recv =
self.tcp_read_total.load(Ordering::Relaxed) + self.tcp_read.load(Ordering::Relaxed);
println!("Recv bytes: {total_recv}");
println!("Recv rate: {:.3}B/s", total_recv as f64 / total_secs);
match &self.io {
HttpIoStats::Tcp(tcp) => {
let total_send =
tcp.write_total.load(Ordering::Relaxed) + tcp.write.load(Ordering::Relaxed);
println!("Send bytes: {total_send}");
println!("Send rate: {:.3}B/s", total_send as f64 / total_secs);
let total_recv =
tcp.read_total.load(Ordering::Relaxed) + tcp.read.load(Ordering::Relaxed);
println!("Recv bytes: {total_recv}");
println!("Recv rate: {:.3}B/s", total_recv as f64 / total_secs);
}
HttpIoStats::Udp(udp) => {
let total_send_bytes = udp.send_bytes_total.load(Ordering::Relaxed)
+ udp.send_bytes.load(Ordering::Relaxed);
println!("Send bytes: {total_send_bytes}");
println!(
"Send rate: {:.3}B/s",
total_send_bytes as f64 / total_secs
);
let total_recv_bytes = udp.recv_bytes_total.load(Ordering::Relaxed)
+ udp.recv_bytes.load(Ordering::Relaxed);
println!("Recv bytes: {total_recv_bytes}");
println!(
"Recv rate: {:.3}B/s",
total_recv_bytes as f64 / total_secs
);
}
}
}
}
1 change: 1 addition & 0 deletions lib/g3-io-ext/src/limit/fixed_window/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum DatagramLimitResult {
DelayFor(u64),
}

#[derive(Default)]
pub struct DatagramLimitInfo {
window: FixedWindow,

Expand Down
Loading

0 comments on commit 044957a

Please sign in to comment.