Skip to content

Commit

Permalink
Locally listen on IPv4 and IPv6 addresses
Browse files Browse the repository at this point in the history
Implemented the `DualStackLocalSocket` for the locally listening socket,
and updated the code base and fixed several issues that existed for
handling IPv6 addresses in the code base.

This included updating the `test_utils` framework to provide coverage in
tests for both ipv4 and ipv6 by allowing for either an IPv4 or IPv6
address to be created from the framework.

Closes googleforgames#666
  • Loading branch information
markmandel committed Aug 23, 2023
1 parent 1fc3954 commit f1b98a2
Show file tree
Hide file tree
Showing 31 changed files with 527 additions and 282 deletions.
3 changes: 2 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use quilkin::test_utils::AddressType;

const MESSAGE_SIZE: usize = 0xffff;
const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff];
Expand Down Expand Up @@ -35,7 +36,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) {
let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
.block_on(quilkin::test_utils::available_addr())
.block_on(quilkin::test_utils::available_addr(&AddressType::Random))
.port(),
..<_>::default()
};
Expand Down
6 changes: 3 additions & 3 deletions docs/src/deployment/admin.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Administration

| services | ports | Protocol |
|----------|-------|-----------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |
| services | ports | Protocol |
|----------------|-------|---------------------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |

## Logging
By default, Quilkin will log `INFO` level events, you can change this by setting
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP(IPv4 && IPv6) |
| QCMP | 7600 | UDP(IPv4 OR IPv6) |

> **Note:** This service is currently in active experimentation and development
so there may be bugs which cause it to be unusable for production, as always
Expand Down
4 changes: 2 additions & 2 deletions docs/src/services/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

| Services | Ports | Protocol |
|----------|-------|--------------------|
| Proxy | 7777 | UDP (IPv4) |
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| Proxy | 7777 | UDP (IPv4 OR IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

"Proxy" is the primary Quilkin service, which acts as a non-transparent UDP
proxy.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/proxy/qcmp.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

In addition to the TCP based administration API, Quilkin provides a meta API
over UDP. The purpose of this API is to provide meta operations that can be
Expand Down
6 changes: 3 additions & 3 deletions docs/src/services/xds.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# xDS Control Plane

| services | ports | Protocol |
|----------|-------|------------|
| xDS | 7800 | gRPC(IPv4) |
| services | ports | Protocol |
|----------|-------|---------------------|
| xDS | 7800 | gRPC (IPv4 OR IPv6) |

For multi-cluster integration, Quilkin provides a `manage` service, that can be
used with a number of configuration discovery providers to provide cluster
Expand Down
2 changes: 1 addition & 1 deletion proto/protoc-gen-validate
66 changes: 32 additions & 34 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,26 +228,21 @@ impl Cli {
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, SocketAddr};

use tokio::{
net::UdpSocket,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

use crate::{
config::{Filter, Providers},
endpoint::{Endpoint, LocalityEndpoints},
filters::{Capture, StaticFilter, TokenRouter},
test_utils::{create_socket, AddressType, TestHelper},
};

#[tokio::test]
async fn relay_routing() {
let server_port = crate::test_utils::available_addr().await.port();
let server_socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, server_port))
.await
.map(Arc::new)
.unwrap();
let mut t = TestHelper::default();
let (mut rx, server_socket) = t.open_socket_and_recv_multiple_packets().await;
let filters_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();

Expand Down Expand Up @@ -286,7 +281,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
Expand All @@ -295,7 +294,9 @@ mod tests {
})
.unwrap();

let relay_admin_port = crate::test_utils::available_addr().await.port();
let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let relay = Cli {
admin_address: Some((Ipv4Addr::LOCALHOST, relay_admin_port).into()),
config: <_>::default(),
Expand All @@ -309,7 +310,9 @@ mod tests {
}),
};

let control_plane_admin_port = crate::test_utils::available_addr().await.port();
let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let control_plane = Cli {
no_admin: false,
quiet: true,
Expand All @@ -327,7 +330,9 @@ mod tests {
}),
};

let proxy_admin_port = crate::test_utils::available_addr().await.port();
let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let proxy = Cli {
no_admin: false,
quiet: true,
Expand All @@ -344,12 +349,9 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::spawn(proxy.drive());
tokio::time::sleep(Duration::from_millis(500)).await;
let local_addr = crate::test_utils::available_addr().await;
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, local_addr.port()))
.await
.map(Arc::new)
.unwrap();
let socket = create_socket().await;
let config = Config::default();
let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into();

for _ in 0..5 {
let token = random_three_characters();
Expand All @@ -361,7 +363,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
Expand All @@ -373,32 +379,24 @@ mod tests {
let mut msg = Vec::from(*b"hello");
msg.extend_from_slice(&token);
tracing::info!(?token, "sending packet");
socket
.send_to(&msg, &(std::net::Ipv4Addr::LOCALHOST, 7777))
.await
.unwrap();

let recv = |socket: Arc<UdpSocket>| async move {
let mut buf = [0; u16::MAX as usize];
let length = socket.recv(&mut buf).await.unwrap();
buf[0..length].to_vec()
};
socket.send_to(&msg, &proxy_address).await.unwrap();

assert_eq!(
b"hello",
&&*timeout(Duration::from_secs(5), (recv)(server_socket.clone()))
"hello",
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
);

tracing::info!(?token, "received packet");

tracing::info!(?token, "sending bad packet");
// send an invalid packet
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &local_addr).await.unwrap();
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await;
let result = timeout(Duration::from_secs(3), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
30 changes: 15 additions & 15 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use tonic::transport::Endpoint;

use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result};
use crate::{proxy::SessionMap, xds::ResourceType, Config, Result};

#[cfg(doc)]
use crate::filters::FilterFactory;
use crate::utils::net::DualStackLocalSocket;

define_port!(7777);

Expand Down Expand Up @@ -153,7 +154,7 @@ impl Proxy {
// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let socket = Arc::new(net::socket_with_reuse(self.port)?);
let socket = Arc::new(DualStackLocalSocket::new(self.port)?);
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
Expand Down Expand Up @@ -181,7 +182,7 @@ mod tests {
use crate::{
config,
endpoint::Endpoint,
test_utils::{available_addr, create_socket, load_test_filters, TestHelper},
test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper},
};

#[tokio::test]
Expand All @@ -191,7 +192,7 @@ mod tests {
let endpoint1 = t.open_socket_and_recv_single_packet().await;
let endpoint2 = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
Expand All @@ -200,8 +201,8 @@ mod tests {
let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![
Endpoint::new(endpoint1.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint1.socket.local_ipv4_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_ipv6_addr().unwrap().into()),
])
});

Expand Down Expand Up @@ -234,16 +235,15 @@ mod tests {
let mut t = TestHelper::default();

let endpoint = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(config, proxy, None);
Expand All @@ -269,7 +269,7 @@ mod tests {

load_test_filters();
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let config = Arc::new(Config::default());
config.filters.store(
crate::filters::FilterChain::try_from(vec![config::Filter {
Expand All @@ -282,7 +282,7 @@ mod tests {
);
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(
Expand Down Expand Up @@ -315,12 +315,12 @@ mod tests {
let t = TestHelper::default();

let socket = Arc::new(create_socket().await);
let addr = socket.local_addr().unwrap();
let addr = socket.local_ipv6_addr().unwrap();
let endpoint = t.open_socket_and_recv_single_packet().await;
let msg = "hello";
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv6_addr().unwrap()])
});

// we'll test a single DownstreamReceiveWorkerConfig
Expand Down Expand Up @@ -350,15 +350,15 @@ mod tests {

let msg = "hello";
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};

let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv4_addr().unwrap()])
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
Expand Down
Loading

0 comments on commit f1b98a2

Please sign in to comment.