Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locally listen on IPv4 and IPv6 addresses #775

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use quilkin::test_utils::AddressType;

const MESSAGE_SIZE: usize = 0xffff;
const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff];
Expand Down Expand Up @@ -35,7 +36,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) {
let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
.block_on(quilkin::test_utils::available_addr())
.block_on(quilkin::test_utils::available_addr(&AddressType::Random))
.port(),
..<_>::default()
};
Expand Down
6 changes: 3 additions & 3 deletions docs/src/deployment/admin.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Administration

| services | ports | Protocol |
|----------|-------|-----------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |
| services | ports | Protocol |
|----------------|-------|---------------------|
| Administration | 8000 | HTTP (IPv4 OR IPv6) |

## Logging
By default, Quilkin will log `INFO` level events, you can change this by setting
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP(IPv4 && IPv6) |
| QCMP | 7600 | UDP(IPv4 OR IPv6) |

> **Note:** This service is currently in active experimentation and development
so there may be bugs which cause it to be unusable for production, as always
Expand Down
4 changes: 2 additions & 2 deletions docs/src/services/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

| Services | Ports | Protocol |
|----------|-------|--------------------|
| Proxy | 7777 | UDP (IPv4) |
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| Proxy | 7777 | UDP (IPv4 OR IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

"Proxy" is the primary Quilkin service, which acts as a non-transparent UDP
proxy.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/services/proxy/qcmp.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP (IPv4 && IPv6) |
| QCMP | 7600 | UDP (IPv4 OR IPv6) |

In addition to the TCP based administration API, Quilkin provides a meta API
over UDP. The purpose of this API is to provide meta operations that can be
Expand Down
6 changes: 3 additions & 3 deletions docs/src/services/xds.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# xDS Control Plane

| services | ports | Protocol |
|----------|-------|------------|
| xDS | 7800 | gRPC(IPv4) |
| services | ports | Protocol |
|----------|-------|---------------------|
| xDS | 7800 | gRPC (IPv4 OR IPv6) |

For multi-cluster integration, Quilkin provides a `manage` service, that can be
used with a number of configuration discovery providers to provide cluster
Expand Down
2 changes: 1 addition & 1 deletion proto/protoc-gen-validate
66 changes: 32 additions & 34 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,26 +228,21 @@ impl Cli {
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, SocketAddr};

use tokio::{
net::UdpSocket,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

use crate::{
config::{Filter, Providers},
endpoint::{Endpoint, LocalityEndpoints},
filters::{Capture, StaticFilter, TokenRouter},
test_utils::{create_socket, AddressType, TestHelper},
};

#[tokio::test]
async fn relay_routing() {
let server_port = crate::test_utils::available_addr().await.port();
let server_socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, server_port))
.await
.map(Arc::new)
.unwrap();
let mut t = TestHelper::default();
let (mut rx, server_socket) = t.open_socket_and_recv_multiple_packets().await;
let filters_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();

Expand Down Expand Up @@ -286,7 +281,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
Expand All @@ -295,7 +294,9 @@ mod tests {
})
.unwrap();

let relay_admin_port = crate::test_utils::available_addr().await.port();
let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let relay = Cli {
admin_address: Some((Ipv4Addr::LOCALHOST, relay_admin_port).into()),
config: <_>::default(),
Expand All @@ -309,7 +310,9 @@ mod tests {
}),
};

let control_plane_admin_port = crate::test_utils::available_addr().await.port();
let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let control_plane = Cli {
no_admin: false,
quiet: true,
Expand All @@ -327,7 +330,9 @@ mod tests {
}),
};

let proxy_admin_port = crate::test_utils::available_addr().await.port();
let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random)
.await
.port();
let proxy = Cli {
no_admin: false,
quiet: true,
Expand All @@ -344,12 +349,9 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::spawn(proxy.drive());
tokio::time::sleep(Duration::from_millis(500)).await;
let local_addr = crate::test_utils::available_addr().await;
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, local_addr.port()))
.await
.map(Arc::new)
.unwrap();
let socket = create_socket().await;
let config = Config::default();
let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into();

for _ in 0..5 {
let token = random_three_characters();
Expand All @@ -361,7 +363,11 @@ mod tests {
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
(
std::net::Ipv4Addr::LOCALHOST,
server_socket.local_ipv4_addr().unwrap().port(),
)
.into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
Expand All @@ -373,32 +379,24 @@ mod tests {
let mut msg = Vec::from(*b"hello");
msg.extend_from_slice(&token);
tracing::info!(?token, "sending packet");
socket
.send_to(&msg, &(std::net::Ipv4Addr::LOCALHOST, 7777))
.await
.unwrap();

let recv = |socket: Arc<UdpSocket>| async move {
let mut buf = [0; u16::MAX as usize];
let length = socket.recv(&mut buf).await.unwrap();
buf[0..length].to_vec()
};
socket.send_to(&msg, &proxy_address).await.unwrap();

assert_eq!(
b"hello",
&&*timeout(Duration::from_secs(5), (recv)(server_socket.clone()))
"hello",
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
);

tracing::info!(?token, "received packet");

tracing::info!(?token, "sending bad packet");
// send an invalid packet
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &local_addr).await.unwrap();
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await;
let result = timeout(Duration::from_secs(3), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
30 changes: 15 additions & 15 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use tonic::transport::Endpoint;

use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result};
use crate::{proxy::SessionMap, xds::ResourceType, Config, Result};

#[cfg(doc)]
use crate::filters::FilterFactory;
use crate::utils::net::DualStackLocalSocket;

define_port!(7777);

Expand Down Expand Up @@ -153,7 +154,7 @@ impl Proxy {
// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let socket = Arc::new(net::socket_with_reuse(self.port)?);
let socket = Arc::new(DualStackLocalSocket::new(self.port)?);
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
Expand Down Expand Up @@ -181,7 +182,7 @@ mod tests {
use crate::{
config,
endpoint::Endpoint,
test_utils::{available_addr, create_socket, load_test_filters, TestHelper},
test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper},
};

#[tokio::test]
Expand All @@ -191,7 +192,7 @@ mod tests {
let endpoint1 = t.open_socket_and_recv_single_packet().await;
let endpoint2 = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
Expand All @@ -200,8 +201,8 @@ mod tests {
let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![
Endpoint::new(endpoint1.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint1.socket.local_ipv4_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_ipv6_addr().unwrap().into()),
])
});

Expand Down Expand Up @@ -234,16 +235,15 @@ mod tests {
let mut t = TestHelper::default();

let endpoint = t.open_socket_and_recv_single_packet().await;

let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(config, proxy, None);
Expand All @@ -269,7 +269,7 @@ mod tests {

load_test_filters();
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let config = Arc::new(Config::default());
config.filters.store(
crate::filters::FilterChain::try_from(vec![config::Filter {
Expand All @@ -282,7 +282,7 @@ mod tests {
);
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
endpoint.socket.local_ipv4_addr().unwrap().into(),
)])
});
t.run_server(
Expand Down Expand Up @@ -315,12 +315,12 @@ mod tests {
let t = TestHelper::default();

let socket = Arc::new(create_socket().await);
let addr = socket.local_addr().unwrap();
let addr = socket.local_ipv6_addr().unwrap();
let endpoint = t.open_socket_and_recv_single_packet().await;
let msg = "hello";
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv6_addr().unwrap()])
});

// we'll test a single DownstreamReceiveWorkerConfig
Expand Down Expand Up @@ -350,15 +350,15 @@ mod tests {

let msg = "hello";
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr().await;
let local_addr = available_addr(&AddressType::Random).await;
let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};

let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(vec![endpoint.socket.local_ipv4_addr().unwrap()])
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
Expand Down
Loading