Skip to content

Commit

Permalink
Locally listen on IPv4 and IPv6 addresses
Browse files Browse the repository at this point in the history
Implemented the `DualStackLocalSocket` as a small newtype for IPv6
sockets that are configured to be able to send and receive ipv6 and ipv4
traffic on all platforms for the locally listening socket.

Updated the code base and docs and fixed several issues that
existed for handling IPv6 addresses in the code base, including updates
to the Firewall filter to be able to better manage ipv4 + ipv6 combo
rule sets.

Updated the `test_utils` framework to provide coverage in tests for
both ipv4 and ipv6 by allowing for either an IPv4 or IPv6 address to be
created from the framework.

Closes #666
  • Loading branch information
markmandel committed Sep 14, 2023
1 parent d8e93b6 commit d18516c
Show file tree
Hide file tree
Showing 35 changed files with 743 additions and 348 deletions.
3 changes: 2 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use quilkin::test_utils::AddressType;

const MESSAGE_SIZE: usize = 0xffff;
const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff];
Expand Down Expand Up @@ -35,7 +36,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) {
let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
.block_on(quilkin::test_utils::available_addr())
.block_on(quilkin::test_utils::available_addr(&AddressType::Random))
.port(),
..<_>::default()
};
Expand Down
6 changes: 5 additions & 1 deletion build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ test-quilkin: ensure-build-image
--entrypoint=cargo $(BUILD_IMAGE_TAG) clippy --tests -- -D warnings
docker run --rm $(common_rust_args) \
--entrypoint=cargo $(BUILD_IMAGE_TAG) fmt -- --check
docker run --rm $(common_rust_args) \
# --network=host because docker containers are not great at ipv6.
docker run --rm $(common_rust_args) \
--network=host \
-e RUST_BACKTRACE=1 --entrypoint=cargo $(BUILD_IMAGE_TAG) test -- --nocapture

# Run tests against the examples
Expand Down Expand Up @@ -241,8 +243,10 @@ docs:
# Start an interactive shell inside the build image
# Useful for testing, or adhoc cargo, gcloud, kubectl or terraform commands
shell: ensure-gcloud-dirs ensure-kube-dirs ensure-build-image
# we --network=host because docker containers are not great at ipv6.
docker run --rm -it $(DOCKER_RUN_ARGS) $(common_rust_args) \
$(gcloud_mount_args) $(kube_mount_args) \
--network=host \
--entrypoint=bash $(BUILD_IMAGE_TAG)

ensure-build-image: ensure-cargo-registry
Expand Down
6 changes: 3 additions & 3 deletions docs/src/deployment/admin.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Administration

| services | ports | Protocol |
|----------|-------|-----------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |
| services | ports | Protocol |
|----------------|-------|---------------------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |

## Logging
By default, Quilkin will log `INFO` level events, you can change this by setting
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP(IPv4 && IPv6) |
| QCMP | 7600 | UDP(IPv4 OR IPv6) |

> **Note:** This service is currently in active experimentation and development
so there may be bugs which cause it to be unusable for production, as always
Expand Down
4 changes: 2 additions & 2 deletions docs/src/services/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

| Services | Ports | Protocol |
|----------|-------|--------------------|
| Proxy | 7777 | UDP (IPv4) |
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| Proxy | 7777 | UDP (IPv4 OR IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

"Proxy" is the primary Quilkin service, which acts as a non-transparent UDP
proxy.
Expand Down
12 changes: 7 additions & 5 deletions docs/src/services/proxy/filters/firewall.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ filters:
config:
on_read:
- action: ALLOW
source: 192.168.51.0/24
sources:
- 192.168.51.0/24
ports:
- 10
- 1000-7000
- 10
- 1000-7000
on_write:
- action: DENY
source: 192.168.51.0/24
sources:
- 192.168.51.0/24
ports:
- 7000
- 7000
clusters:
default:
localities:
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/proxy/qcmp.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

In addition to the TCP based administration API, Quilkin provides a meta API
over UDP. The purpose of this API is to provide meta operations that can be
Expand Down
6 changes: 3 additions & 3 deletions docs/src/services/xds.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# xDS Control Plane

| services | ports | Protocol |
|----------|-------|------------|
| xDS | 7800 | gRPC(IPv4) |
| services | ports | Protocol |
|----------|-------|---------------------|
| xDS | 7800 | gRPC (IPv4 OR IPv6) |

For multi-cluster integration, Quilkin provides a `manage` service, that can be
used with a number of configuration discovery providers to provide cluster
Expand Down
2 changes: 1 addition & 1 deletion proto/quilkin/filters/firewall/v1alpha1/firewall.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ message Firewall {

message Rule {
Action action = 1;
string source = 2;
repeated string sources = 2;
repeated PortRange ports = 3;
}

Expand Down
66 changes: 32 additions & 34 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,26 +262,21 @@ impl Cli {
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, SocketAddr};

use tokio::{
net::UdpSocket,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

use crate::{
config::{Filter, Providers},
endpoint::{Endpoint, LocalityEndpoints},
filters::{Capture, StaticFilter, TokenRouter},
test_utils::{create_socket, AddressType, TestHelper},
};

#[tokio::test]
async fn relay_routing() {
let server_port = crate::test_utils::available_addr().await.port();
let server_socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, server_port))
.await
.map(Arc::new)
.unwrap();
let mut t = TestHelper::default();
let (mut rx, server_socket) = t.open_socket_and_recv_multiple_packets().await;
let filters_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();

Expand Down Expand Up @@ -320,7 +315,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
Expand All @@ -329,7 +328,9 @@ mod tests {
})
.unwrap();

let relay_admin_port = crate::test_utils::available_addr().await.port();
let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let relay = Cli {
admin_address: Some((Ipv4Addr::LOCALHOST, relay_admin_port).into()),
config: <_>::default(),
Expand All @@ -344,7 +345,9 @@ mod tests {
log_format: LogFormats::default(),
};

let control_plane_admin_port = crate::test_utils::available_addr().await.port();
let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let control_plane = Cli {
no_admin: false,
quiet: true,
Expand All @@ -363,7 +366,9 @@ mod tests {
log_format: LogFormats::default(),
};

let proxy_admin_port = crate::test_utils::available_addr().await.port();
let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let proxy = Cli {
no_admin: false,
quiet: true,
Expand All @@ -381,12 +386,9 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::spawn(proxy.drive());
tokio::time::sleep(Duration::from_millis(500)).await;
let local_addr = crate::test_utils::available_addr().await;
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, local_addr.port()))
.await
.map(Arc::new)
.unwrap();
let socket = create_socket().await;
let config = Config::default();
let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into();

for _ in 0..5 {
let token = random_three_characters();
Expand All @@ -398,7 +400,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
Expand All @@ -410,32 +416,24 @@ mod tests {
let mut msg = Vec::from(*b"hello");
msg.extend_from_slice(&token);
tracing::info!(?token, "sending packet");
socket
.send_to(&msg, &(std::net::Ipv4Addr::LOCALHOST, 7777))
.await
.unwrap();

let recv = |socket: Arc<UdpSocket>| async move {
let mut buf = [0; u16::MAX as usize];
let length = socket.recv(&mut buf).await.unwrap();
buf[0..length].to_vec()
};
socket.send_to(&msg, &proxy_address).await.unwrap();

assert_eq!(
b"hello",
&&*timeout(Duration::from_secs(5), (recv)(server_socket.clone()))
"hello",
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
);

tracing::info!(?token, "received packet");

tracing::info!(?token, "sending bad packet");
// send an invalid packet
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &local_addr).await.unwrap();
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await;
let result = timeout(Duration::from_secs(3), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
30 changes: 15 additions & 15 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use tonic::transport::Endpoint;

use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result};
use crate::{proxy::SessionMap, xds::ResourceType, Config, Result};

#[cfg(doc)]
use crate::filters::FilterFactory;
use crate::utils::net::DualStackLocalSocket;

define_port!(7777);

Expand Down Expand Up @@ -153,7 +154,7 @@ impl Proxy {
// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let socket = Arc::new(net::socket_with_reuse(self.port)?);
let socket = Arc::new(DualStackLocalSocket::new(self.port)?);
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
Expand Down Expand Up @@ -181,7 +182,7 @@ mod tests {
use crate::{
config,
endpoint::Endpoint,
test_utils::{available_addr, create_socket, load_test_filters, TestHelper},
test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper},
};

#[tokio::test]
Expand All @@ -191,7 +192,7 @@ mod tests {
let endpoint1 = t.open_socket_and_recv_single_packet().await;
let endpoint2 = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
Expand All @@ -200,8 +201,8 @@ mod tests {
let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![
Endpoint::new(endpoint1.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint1.socket.local_ipv4_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_ipv6_addr().unwrap().into()),
])
});

Expand Down Expand Up @@ -234,16 +235,15 @@ mod tests {
let mut t = TestHelper::default();

let endpoint = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(config, proxy, None);
Expand All @@ -269,7 +269,7 @@ mod tests {

load_test_filters();
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let config = Arc::new(Config::default());
config.filters.store(
crate::filters::FilterChain::try_from(vec![config::Filter {
Expand All @@ -282,7 +282,7 @@ mod tests {
);
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(
Expand Down Expand Up @@ -315,12 +315,12 @@ mod tests {
let t = TestHelper::default();

let socket = Arc::new(create_socket().await);
let addr = socket.local_addr().unwrap();
let addr = socket.local_ipv6_addr().unwrap();
let endpoint = t.open_socket_and_recv_single_packet().await;
let msg = "hello";
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv6_addr().unwrap()])
});

// we'll test a single DownstreamReceiveWorkerConfig
Expand Down Expand Up @@ -350,15 +350,15 @@ mod tests {

let msg = "hello";
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};

let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv4_addr().unwrap()])
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
Expand Down
Loading

0 comments on commit d18516c

Please sign in to comment.