Skip to content

Commit

Permalink
Refactor sessions to use socket pool (#815)
Browse files Browse the repository at this point in the history
This commit refactors how we handle upstream connections to the
gameservers. When profiling quilkin I noticed that there was a lot of
time (~10–15%) being spent dropping the upstream socket through its Arc
implementation that happened whenever a session was dropped.

As I was thinking about how to solve this problem I also realised there
was a second issue, which is that there is a limitation on how many
connections Quilkin can hold at once, roughly ~16,383. Because after
that we're likely to start encountering port exhaustion from the
operating system, since each session is a unique socket.

This brought me to the solution in this commit, which is that while we
need to give each connection to the gameserver a unique port, we don't
need to give a unique port across gameservers. So I refactored how we
create sessions to use what I've called a "SessionPool". This pools the
sockets for sessions into a map that is keyed by their destination.

With this implementation this means that we now have a limit of ~16,000
connections per gameserver, which is far more than any gameserver could
reasonably need.
  • Loading branch information
XAMPPRocky authored Oct 16, 2023
1 parent cdd3e82 commit 439df95
Show file tree
Hide file tree
Showing 26 changed files with 764 additions and 423 deletions.
17 changes: 16 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ static THROUGHPUT_SERVER_INIT: Lazy<()> = Lazy::new(|| {
static FEEDBACK_LOOP: Lazy<()> = Lazy::new(|| {
std::thread::spawn(|| {
let socket = UdpSocket::bind(FEEDBACK_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

loop {
let mut packet = [0; MESSAGE_SIZE];
Expand All @@ -74,6 +77,9 @@ fn throughput_benchmark(c: &mut Criterion) {
// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
let socket = UdpSocket::bind(BENCH_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];

let mut group = c.benchmark_group("throughput");
Expand Down Expand Up @@ -125,6 +131,9 @@ fn write_feedback(addr: SocketAddr) -> mpsc::Sender<Vec<u8>> {
let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(addr).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
let (_, source) = socket.recv_from(&mut packet).unwrap();
while let Ok(packet) = write_rx.recv() {
Expand All @@ -142,6 +151,9 @@ fn readwrite_benchmark(c: &mut Criterion) {
let (read_tx, read_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(READ_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
loop {
let (length, _) = socket.recv_from(&mut packet).unwrap();
Expand All @@ -164,9 +176,12 @@ fn readwrite_benchmark(c: &mut Criterion) {
Lazy::force(&WRITE_SERVER_INIT);

// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
std::thread::sleep(std::time::Duration::from_millis(150));

let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

// prime the direct write connection
socket.send_to(PACKETS[0], direct_write_addr).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ mod tests {
std::fs::write(endpoints_file.path(), {
config.clusters.write().insert_default(
[Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(std::net::Ipv6Addr::LOCALHOST, server_port).into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
Expand All @@ -436,7 +436,7 @@ mod tests {

assert_eq!(
"hello",
timeout(Duration::from_secs(5), rx.recv())
timeout(Duration::from_millis(500), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -449,7 +449,7 @@ mod tests {
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
80 changes: 60 additions & 20 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
use tonic::transport::Endpoint;

use super::Admin;
use crate::{proxy::SessionMap, xds::ResourceType, Config, Result};
use crate::{proxy::SessionPool, xds::ResourceType, Config, Result};

#[cfg(doc)]
use crate::filters::FilterFactory;
Expand Down Expand Up @@ -81,9 +81,6 @@ impl Proxy {
mode: Admin,
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
const SESSION_TIMEOUT_SECONDS: Duration = Duration::from_secs(60);
const SESSION_EXPIRY_POLL_INTERVAL: Duration = Duration::from_secs(60);

let _mmdb_task = self.mmdb.clone().map(|source| {
tokio::spawn(async move {
use crate::config::BACKOFF_INITIAL_DELAY_MILLISECONDS;
Expand Down Expand Up @@ -122,8 +119,9 @@ impl Proxy {
let id = config.id.load();
tracing::info!(port = self.port, proxy_id = &*id, "Starting");

let sessions = SessionMap::new(SESSION_TIMEOUT_SECONDS, SESSION_EXPIRY_POLL_INTERVAL);
let runtime_config = mode.unwrap_proxy();
let shared_socket = Arc::new(DualStackLocalSocket::new(self.port)?);
let sessions = SessionPool::new(config.clone(), shared_socket.clone(), shutdown_rx.clone());

let _xds_stream = if !self.management_server.is_empty() {
{
Expand Down Expand Up @@ -152,7 +150,7 @@ impl Proxy {
None
};

self.run_recv_from(&config, sessions.clone())?;
self.run_recv_from(&config, &sessions, shared_socket)?;
crate::protocol::spawn(self.qcmp_port).await?;
tracing::info!("Quilkin is ready");

Expand All @@ -161,10 +159,10 @@ impl Proxy {
.await
.map_err(|error| eyre::eyre!(error))?;

tracing::info!(sessions=%sessions.len(), "waiting for active sessions to expire");
while sessions.is_not_empty() {
tracing::info!(sessions=%sessions.sessions().len(), "waiting for active sessions to expire");
while sessions.sessions().is_not_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
tracing::debug!(sessions=%sessions.len(), "sessions still active");
tracing::debug!(sessions=%sessions.sessions().len(), "sessions still active");
}
tracing::info!("all sessions expired");

Expand All @@ -176,18 +174,29 @@ impl Proxy {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
fn run_recv_from(&self, config: &Arc<Config>, sessions: SessionMap) -> Result<()> {
fn run_recv_from(
&self,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
shared_socket: Arc<DualStackLocalSocket>,
) -> Result<()> {
// The number of worker tasks to spawn. Each task gets a dedicated queue to
// consume packets off.
let num_workers = num_cpus::get();

// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let socket = Arc::new(DualStackLocalSocket::new(self.port)?);
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id: 0,
socket: shared_socket,
config: config.clone(),
sessions: sessions.clone(),
});

for worker_id in 1..num_workers {
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
socket: Arc::new(DualStackLocalSocket::new(self.port)?),
config: config.clone(),
sessions: sessions.clone(),
})
Expand Down Expand Up @@ -241,6 +250,7 @@ mod tests {

t.run_server(config, proxy, None);

tracing::trace!(%local_addr, "sending hello");
let msg = "hello";
endpoint1
.socket
Expand All @@ -249,14 +259,14 @@ mod tests {
.unwrap();
assert_eq!(
msg,
timeout(Duration::from_secs(1), endpoint1.packet_rx)
timeout(Duration::from_millis(100), endpoint1.packet_rx)
.await
.expect("should get a packet")
.unwrap()
);
assert_eq!(
msg,
timeout(Duration::from_secs(1), endpoint2.packet_rx)
timeout(Duration::from_millis(100), endpoint2.packet_rx)
.await
.expect("should get a packet")
.unwrap()
Expand All @@ -268,7 +278,8 @@ mod tests {
let mut t = TestHelper::default();

let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr(&AddressType::Random).await;
let mut local_addr = available_addr(&AddressType::Ipv6).await;
crate::test_utils::map_addr_to_localhost(&mut local_addr);
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
Expand All @@ -277,14 +288,16 @@ mod tests {
config.clusters.modify(|clusters| {
clusters.insert_default(
[Endpoint::new(
endpoint.socket.local_ipv4_addr().unwrap().into(),
endpoint.socket.local_ipv6_addr().unwrap().into(),
)]
.into(),
);
});
t.run_server(config, proxy, None);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let msg = "hello";
tracing::debug!(%local_addr, "sending packet");
endpoint
.socket
.send_to(msg.as_bytes(), &local_addr)
Expand Down Expand Up @@ -366,8 +379,19 @@ mod tests {
crate::proxy::DownstreamReceiveWorkerConfig {
worker_id: 1,
socket: socket.clone(),
config,
sessions: <_>::default(),
config: config.clone(),
sessions: SessionPool::new(
config,
Arc::new(
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
),
tokio::sync::watch::channel(()).1,
),
}
.spawn();

Expand Down Expand Up @@ -405,7 +429,23 @@ mod tests {
)
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
let shared_socket = Arc::new(
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
);
let sessions = SessionPool::new(
config.clone(),
shared_socket.clone(),
tokio::sync::watch::channel(()).1,
);

proxy
.run_recv_from(&config, &sessions, shared_socket)
.unwrap();

let socket = create_socket().await;
socket.send_to(msg.as_bytes(), &local_addr).await.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ mod tests {
);

let mut context = WriteContext::new(
endpoints_fixture[0].clone(),
endpoints_fixture[0].address.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -417,7 +416,6 @@ mod tests {
);

let mut context = WriteContext::new(
endpoints_fixture[0].clone(),
endpoints_fixture[0].address.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
Expand Down
4 changes: 0 additions & 4 deletions src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ mod tests {

// write decompress
let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
read_context.contents.clone(),
Expand All @@ -223,7 +222,6 @@ mod tests {

assert!(compression
.write(&mut WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -270,7 +268,6 @@ mod tests {
assert_eq!(b"hello", &*read_context.contents);

let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -329,7 +326,6 @@ mod tests {
let expected = contents_fixture();
// write compress
let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
expected.clone(),
Expand Down
8 changes: 6 additions & 2 deletions src/filters/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ impl Filter for Debug {

#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
async fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
info!(id = ?self.config.id, endpoint = ?ctx.endpoint.address, source = ?&ctx.source,
dest = ?&ctx.dest, contents = ?String::from_utf8_lossy(&ctx.contents), "Write filter event");
info!(
id = ?self.config.id,
source = ?&ctx.source,
dest = ?&ctx.dest,
contents = ?String::from_utf8_lossy(&ctx.contents), "Write filter event"
);
Ok(())
}
}
Expand Down
16 changes: 3 additions & 13 deletions src/filters/firewall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,13 @@ mod tests {
}],
};

let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 80).into());
let local_addr: crate::endpoint::EndpointAddress = (Ipv4Addr::LOCALHOST, 8081).into();

let mut ctx = WriteContext::new(
endpoint.clone(),
([192, 168, 75, 20], 80).into(),
local_addr.clone(),
vec![],
);
let mut ctx =
WriteContext::new(([192, 168, 75, 20], 80).into(), local_addr.clone(), vec![]);
assert!(firewall.write(&mut ctx).await.is_ok());

let mut ctx = WriteContext::new(
endpoint,
([192, 168, 77, 20], 80).into(),
local_addr,
vec![],
);
let mut ctx = WriteContext::new(([192, 168, 77, 20], 80).into(), local_addr, vec![]);
assert!(firewall.write(&mut ctx).await.is_err());
}
}
1 change: 0 additions & 1 deletion src/filters/match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ mod tests {
// no config, so should make no change.
filter
.write(&mut WriteContext::new(
endpoint.clone(),
endpoint.address,
"127.0.0.1:70".parse().unwrap(),
contents.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/filters/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
.await
.is_ok());
assert!(filter
.write(&mut WriteContext::new(endpoint, addr.clone(), addr, vec![],))
.write(&mut WriteContext::new(addr.clone(), addr, vec![],))
.await
.is_ok());
}
Expand Down
Loading

0 comments on commit 439df95

Please sign in to comment.