From 8497e5967e1e82f00999a59a673e4a85cfbaeafc Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Wed, 6 Sep 2023 20:06:09 +0200 Subject: [PATCH 1/3] Update dependencies and Rust version (#782) --- Cargo.toml | 66 +++++++++++----------- build.rs | 4 +- examples/quilkin-filter-example/Cargo.toml | 20 +++---- rust-toolchain.toml | 2 +- src/cli/qcmp.rs | 7 ++- src/filters/load_balancer.rs | 2 +- src/filters/timestamp.rs | 1 + src/protocol.rs | 17 +++++- src/xds/server.rs | 12 +++- 9 files changed, 78 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d46b3fa647..26db776ee0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,70 +54,70 @@ async-stream = "0.3.5" base64.workspace = true base64-serde = "0.7.0" bytes = { version = "1.4.0", features = ["serde"] } -cached = "0.43.0" -chrono = "0.4.24" -clap = { version = "4.3.0", features = ["cargo", "derive", "env"] } -dashmap = { version = "5.4.0", features = ["serde"] } +cached = "0.45.0" +chrono = "0.4.28" +clap = { version = "4.4.2", features = ["cargo", "derive", "env"] } +dashmap = { version = "5.5.3", features = ["serde"] } dirs2 = "3.0.1" -either = "1.8.1" -enum-map = "2.5.0" +either = "1.9.0" +enum-map = "2.6.1" eyre = "0.6.8" futures.workspace = true -hyper = { version = "0.14.26", features = ["http2"] } -hyper-rustls = { version = "0.24.0", features = ["http2", "webpki-roots"] } +hyper = { version = "0.14.27", features = ["http2"] } +hyper-rustls = { version = "0.24.1", features = ["http2", "webpki-roots"] } ipnetwork = "0.20.0" k8s-openapi.workspace = true maxminddb = "0.23.0" -notify = "6.0.0" -num_cpus = "1.15.0" -once_cell = "1.17.1" +notify = "6.1.1" +num_cpus = "1.16.0" +once_cell = "1.18.0" parking_lot = "0.12.1" prometheus = { version = "0.13.3", default-features = false } -prost = "0.11.9" -prost-types = "0.11.9" +prost = "0.12.0" +prost-types = "0.12.0" rand = "0.8.5" -regex = "1.8.2" -schemars = { version = "0.8.12", features = ["chrono", "bytes", "url"] } -serde = { version = "1.0.163", features = ["derive", "rc"] } -serde_json = "1.0.96" +regex = "1.9.5" +schemars = { version = "0.8.13", features = ["chrono", "bytes", "url"] } +serde = { version = "1.0.188", features = ["derive", "rc"] } +serde_json = "1.0.105" serde_regex = "1.1.0" -serde_stacker = "0.1.8" -serde_yaml = "0.9.21" +serde_stacker = "0.1.10" +serde_yaml = "0.9.25" snap = "1.1.0" socket2 = { version = "0.5.3", features = ["all"] } stable-eyre = "0.2.2" -thiserror = "1.0.40" +thiserror = "1.0.48" tokio.workspace = true tokio-stream = { version = "0.1.14", features = ["sync"] } -tonic = "0.9.2" +tonic = "0.10.0" tracing.workspace = true tracing-futures = { version = "0.2.5", features = ["futures-03"] } tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } -tryhard = "0.5.0" -url = { version = "2.3.1", features = ["serde"] } -uuid = { version = "1.3.3", default-features = false, features = ["v4"] } +tryhard = "0.5.1" +url = { version = "2.4.1", features = ["serde"] } +uuid = { version = "1.4.1", default-features = false, features = ["v4"] } lasso = { version = "0.7.2", features = ["multi-threaded"] } kube.workspace = true trust-dns-resolver = { version = "0.23.0", features = ["tokio", "tokio-rustls", "dns-over-https-rustls"] } -async-trait = "0.1.68" +async-trait = "0.1.73" nom = "7.1.3" [target.'cfg(target_os = "linux")'.dependencies] sys-info = "0.9.1" [dev-dependencies] -regex = "1.8.2" -criterion = { version = "0.5.0", features = ["html_reports"] } -once_cell = "1.17.1" +regex = "1.9.5" +criterion = { version = "0.5.1", features = ["html_reports"] } +once_cell = "1.18.0" tracing-test = "0.2.4" -pretty_assertions = "1.3.0" -tempfile = "3.5.0" +pretty_assertions = "1.4.0" +tempfile = "3.8.0" rand = "0.8.5" [build-dependencies] -tonic-build = { version = "0.9.2", default_features = false, features = ["transport", "prost"] } -prost-build = "0.11.9" -built = { version = "0.6.0", features = ["git2"] } +tonic-build = { version = "0.10.0", default_features = false, features = ["transport", "prost"] } +prost-build = "0.12.0" +built = { version = "0.6.1", features = ["git2"] } protobuf-src = { version = "1.1.0", optional = true } [features] diff --git a/build.rs b/build.rs index 6e1a92008d..6a700161bc 100644 --- a/build.rs +++ b/build.rs @@ -51,7 +51,7 @@ fn main() -> Result<(), Box> { .map(|name| std::env::current_dir().unwrap().join(name)) .collect::>(); - let include_dirs = vec![ + let include_dirs = [ "proto/data-plane-api", "proto/udpa", "proto/googleapis", @@ -83,7 +83,7 @@ fn main() -> Result<(), Box> { // This tells cargo to re-run this build script only when the proto files // we're interested in change or the any of the proto directories were updated. - for path in vec![proto_files, include_dirs].concat() { + for path in [proto_files, include_dirs].concat() { println!("cargo:rerun-if-changed={}", path.to_str().unwrap()); } diff --git a/examples/quilkin-filter-example/Cargo.toml b/examples/quilkin-filter-example/Cargo.toml index 4b47b957c9..92f34944c2 100644 --- a/examples/quilkin-filter-example/Cargo.toml +++ b/examples/quilkin-filter-example/Cargo.toml @@ -27,16 +27,16 @@ edition = "2021" # If lifting this example, you will want to be explicit about the Quilkin version, e.g. # quilkin = "0.2.0" quilkin = { path = "../../" } -tokio = { version = "1.21.2", features = ["full"] } -tonic = "0.8.2" -prost = "0.11.2" -prost-types = "0.11.2" -serde = "1.0.147" -serde_yaml = "0.9.14" -bytes = "1.2.1" -schemars = "0.8.11" -async-trait = "0.1.68" +tokio = { version = "1.32", features = ["full"] } +tonic = "0.10" +prost = "0.12" +prost-types = "0.12" +serde = "1" +serde_yaml = "0.9" +bytes = "1" +schemars = "0.8" +async-trait = "0.1" [build-dependencies] -prost-build = "0.11.2" +prost-build = "0.12" protobuf-src = "1.1.0" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 6af84e59a8..19c4041fb7 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -13,5 +13,5 @@ # limitations under the License. [toolchain] -channel = "1.71.1" +channel = "1.72.0" components = ["rustfmt", "clippy"] diff --git a/src/cli/qcmp.rs b/src/cli/qcmp.rs index 21afe327b2..9d732ce83f 100644 --- a/src/cli/qcmp.rs +++ b/src/cli/qcmp.rs @@ -52,7 +52,12 @@ impl Ping { .await .unwrap(); - let Ok(socket_result) = tokio::time::timeout(std::time::Duration::from_secs(1), socket.recv_from(&mut buf)).await else { + let Ok(socket_result) = tokio::time::timeout( + std::time::Duration::from_secs(1), + socket.recv_from(&mut buf), + ) + .await + else { tracing::error!(endpoint=%self.endpoint, "exceeded timeout duration"); continue; }; diff --git a/src/filters/load_balancer.rs b/src/filters/load_balancer.rs index 1a625800a3..24acf97782 100644 --- a/src/filters/load_balancer.rs +++ b/src/filters/load_balancer.rs @@ -169,7 +169,7 @@ policy: RANDOM ([127, 0, 0, 3], 8080).into(), ]; let source_ips = vec![[127u8, 1, 1, 1], [127, 2, 2, 2], [127, 3, 3, 3]]; - let source_ports = vec![11111u16, 22222, 33333, 44444, 55555]; + let source_ports = [11111u16, 22222, 33333, 44444, 55555]; let yaml = "policy: HASH"; let filter = LoadBalancer::from_config(serde_yaml::from_str(yaml).unwrap()); diff --git a/src/filters/timestamp.rs b/src/filters/timestamp.rs index 5bbeda171c..e90dc70528 100644 --- a/src/filters/timestamp.rs +++ b/src/filters/timestamp.rs @@ -60,6 +60,7 @@ impl Timestamp { }; // Create a normal DateTime from the NaiveDateTime + #[allow(deprecated)] let datetime: DateTime = DateTime::from_utc(naive, Utc); let now = Utc::now(); diff --git a/src/protocol.rs b/src/protocol.rs index b8f649e0e6..b0a52f5fa7 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -68,7 +68,11 @@ pub async fn spawn(port: u16) -> crate::Result<()> { } }; - let Protocol::Ping { client_timestamp, nonce, } = command else { + let Protocol::Ping { + client_timestamp, + nonce, + } = command + else { tracing::warn!("rejected unsupported QCMP packet"); continue; }; @@ -206,7 +210,13 @@ impl Protocol { /// /// [ntp]: https://en.wikipedia.org/wiki/Network_Time_Protocol#Clock_synchronization_algorithm pub fn round_trip_delay(&self, client_response_timestamp: i64) -> Option { - let Protocol::PingReply { client_timestamp, server_start_timestamp, server_transmit_timestamp, .. } = self else { + let Protocol::PingReply { + client_timestamp, + server_start_timestamp, + server_transmit_timestamp, + .. + } = self + else { return None; }; @@ -243,7 +253,8 @@ impl Protocol { /// is not a QCMP packet, and returning `Err` when it was detected as a /// QCMP packet, but there was an error in parsing the payload. pub fn parse(input: &[u8]) -> Result> { - let Ok((input, _)) = complete::tag::<_, _, nom::error::Error<_>>(MAGIC_NUMBER)(input) else { + let Ok((input, _)) = complete::tag::<_, _, nom::error::Error<_>>(MAGIC_NUMBER)(input) + else { return Ok(None); }; diff --git a/src/xds/server.rs b/src/xds/server.rs index 7b778a192c..b4afb6bcc2 100644 --- a/src/xds/server.rs +++ b/src/xds/server.rs @@ -301,8 +301,16 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { ) -> Result, tonic::Status> { tracing::info!("control plane discovery stream attempt"); let mut responses = responses.into_inner(); - let Some(identifier) = responses.next().await.ok_or_else(|| tonic::Status::cancelled("received empty first response"))??.control_plane.map(|cp| cp.identifier) else { - return Err(tonic::Status::invalid_argument("DiscoveryResponse.control_plane.identifier is required in the first message")) + let Some(identifier) = responses + .next() + .await + .ok_or_else(|| tonic::Status::cancelled("received empty first response"))?? + .control_plane + .map(|cp| cp.identifier) + else { + return Err(tonic::Status::invalid_argument( + "DiscoveryResponse.control_plane.identifier is required in the first message", + )); }; tracing::info!(%identifier, "new control plane discovery stream"); From f32387f8f1aa519223aabe2e18be548edabec453 Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Wed, 6 Sep 2023 20:42:16 +0200 Subject: [PATCH 2/3] Late initialise upstream socket to prevent session map lock (#781) * Late initialise upstream socket to prevent session map lock * Add exception for `option-ext` * allow deprecated chrono function --- deny.toml | 3 +- src/proxy.rs | 27 ++++------- src/proxy/sessions.rs | 105 ++++++++++++++++++++++++------------------ 3 files changed, 73 insertions(+), 62 deletions(-) diff --git a/deny.toml b/deny.toml index c83f7de876..12a140ef21 100644 --- a/deny.toml +++ b/deny.toml @@ -31,7 +31,8 @@ exceptions = [ # Each entry is the crate and version constraint, and its specific allow # list { name ="webpki-roots", version = "0.25.0", allow = ["MPL-2.0"] }, - { name ="webpki-roots", version = "0.23.0", allow = ["MPL-2.0"] } + { name ="webpki-roots", version = "0.23.0", allow = ["MPL-2.0"] }, + { name ="option-ext", version = "0.2.0", allow = ["MPL-2.0"] } ] [[licenses.clarify]] diff --git a/src/proxy.rs b/src/proxy.rs index ca4f843a7b..fcc0574989 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -23,11 +23,10 @@ use tokio::net::UdpSocket; use crate::{ endpoint::{Endpoint, EndpointAddress}, filters::{Filter, ReadContext}, - ttl_map::TryResult, Config, }; -pub use sessions::{Session, SessionArgs, SessionKey, SessionMap}; +pub use sessions::{Session, SessionKey, SessionMap}; /// Packet received from local port #[derive(Debug)] @@ -204,25 +203,21 @@ impl DownstreamReceiveWorkerConfig { dest: endpoint.address.clone(), }; - let send_future = match sessions.try_get(&session_key) { - TryResult::Present(entry) => entry.send(packet), - TryResult::Absent => { - let session_args = SessionArgs { - config: config.clone(), - source: session_key.source.clone(), - downstream_socket: downstream_socket.clone(), - dest: endpoint.clone(), + let send_future = match sessions.get(&session_key) { + Some(entry) => entry.send(packet), + None => { + let session = Session::new( + config.clone(), + session_key.source.clone(), + downstream_socket.clone(), + endpoint.clone(), asn_info, - }; + )?; - let session = session_args.into_session().await?; let future = session.send(packet); sessions.insert(session_key, session); future } - TryResult::Locked => { - return Err(PipelineError::SessionMapLocked); - } }; send_future.await @@ -233,8 +228,6 @@ impl DownstreamReceiveWorkerConfig { pub enum PipelineError { #[error("No upstream endpoints available")] NoUpstreamEndpoints, - #[error("session map was locked")] - SessionMapLocked, #[error("filter {0}")] Filter(#[from] crate::filters::FilterError), #[error("qcmp: {0}")] diff --git a/src/proxy/sessions.rs b/src/proxy/sessions.rs index a7e142fcfd..59db2ce723 100644 --- a/src/proxy/sessions.rs +++ b/src/proxy/sessions.rs @@ -18,7 +18,12 @@ pub(crate) mod metrics; use std::sync::Arc; -use tokio::{net::UdpSocket, select, sync::watch, time::Instant}; +use tokio::{ + net::UdpSocket, + select, + sync::{watch, OnceCell}, + time::Instant, +}; use crate::{ endpoint::{Endpoint, EndpointAddress}, @@ -35,7 +40,7 @@ pub struct Session { /// created_at is time at which the session was created created_at: Instant, /// socket that sends and receives from and to the endpoint address - upstream_socket: Arc, + upstream_socket: Arc>>, /// dest is where to send data to dest: Endpoint, /// address of original sender @@ -68,63 +73,77 @@ struct ReceivedPacketContext<'a> { dest: EndpointAddress, } -pub struct SessionArgs { - pub config: Arc, - pub source: EndpointAddress, - pub downstream_socket: Arc, - pub dest: Endpoint, - pub asn_info: Option, -} - -impl SessionArgs { - /// Creates a new Session, and starts the process of receiving udp sockets - /// from its ephemeral port from endpoint(s) - pub async fn into_session(self) -> Result { - Session::new(self).await - } -} - impl Session { /// internal constructor for a Session from SessionArgs #[tracing::instrument(skip_all)] - async fn new(args: SessionArgs) -> Result { - let addr = (std::net::Ipv4Addr::UNSPECIFIED, 0); - let upstream_socket = Arc::new(UdpSocket::bind(addr).await?); - upstream_socket - .connect(args.dest.address.to_socket_addr().await?) - .await?; + pub fn new( + config: Arc, + source: EndpointAddress, + downstream_socket: Arc, + dest: Endpoint, + asn_info: Option, + ) -> Result { let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); let s = Session { - config: args.config.clone(), - upstream_socket, - source: args.source.clone(), - dest: args.dest, + config: config.clone(), + upstream_socket: Arc::new(OnceCell::new()), + source: source.clone(), + dest, created_at: Instant::now(), shutdown_tx, - asn_info: args.asn_info, + asn_info, }; tracing::debug!(source = %s.source, dest = ?s.dest, "Session created"); self::metrics::total_sessions().inc(); s.active_session_metric().inc(); - s.run(args.downstream_socket, shutdown_rx); + s.run(downstream_socket, shutdown_rx); Ok(s) } + fn upstream_socket( + &self, + ) -> impl std::future::Future, super::PipelineError>> { + let upstream_socket = self.upstream_socket.clone(); + let address = self.dest.address.clone(); + + async move { + upstream_socket + .get_or_try_init(|| async { + let upstream_socket = + UdpSocket::bind((std::net::Ipv4Addr::UNSPECIFIED, 0)).await?; + upstream_socket + .connect(address.to_socket_addr().await?) + .await?; + Ok(Arc::new(upstream_socket)) + }) + .await + .cloned() + } + } + /// run starts processing receiving upstream udp packets /// and sending them back downstream fn run(&self, downstream_socket: Arc, mut shutdown_rx: watch::Receiver<()>) { let source = self.source.clone(); let config = self.config.clone(); let endpoint = self.dest.clone(); - let upstream_socket = self.upstream_socket.clone(); + let upstream_socket = self.upstream_socket(); let asn_info = self.asn_info.clone(); tokio::spawn(async move { let mut buf: Vec = vec![0; 65535]; let mut last_received_at = None; + let upstream_socket = match upstream_socket.await { + Ok(socket) => socket, + Err(error) => { + tracing::error!(%error, "upstream socket failed to initialise"); + return; + } + }; + loop { tracing::debug!(source = %source, dest = ?endpoint, "Awaiting incoming packet"); let asn_info = asn_info.as_ref(); @@ -226,8 +245,8 @@ impl Session { contents = %crate::utils::base64_encode(buf), "sending packet upstream"); - let socket = self.upstream_socket.clone(); - async move { socket.send(buf).await.map_err(From::from) } + let socket = self.upstream_socket(); + async move { socket.await?.send(buf).await.map_err(From::from) } } } @@ -277,7 +296,7 @@ mod tests { use crate::{ endpoint::{Endpoint, EndpointAddress}, - proxy::sessions::{ReceivedPacketContext, SessionArgs}, + proxy::sessions::ReceivedPacketContext, test_utils::{create_socket, new_test_config, TestHelper}, }; @@ -289,17 +308,15 @@ mod tests { let socket = Arc::new(create_socket().await); let msg = "hello"; - let sess = Session::new(SessionArgs { - config: <_>::default(), - source: addr.clone(), - downstream_socket: socket.clone(), - dest: endpoint, - asn_info: None, - }) - .await - .unwrap(); + let sess = + Session::new(<_>::default(), addr.clone(), socket.clone(), endpoint, None).unwrap(); - sess.send(msg.as_bytes()).await.unwrap(); + sess.upstream_socket() + .await + .unwrap() + .send(msg.as_bytes()) + .await + .unwrap(); let mut buf = vec![0; 1024]; let (size, recv_addr) = timeout(Duration::from_secs(5), socket.recv_from(&mut buf)) From d653025e84192ccb748481b7480d581401d1e03e Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Wed, 6 Sep 2023 22:49:26 -0700 Subject: [PATCH 3/3] README description update (#783) --- README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9f1b38b1eb..db847a9d2a 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,9 @@ [![Discord](https://img.shields.io/discord/773975408265134100)](https://discord.gg/mfBNZjBDnc) [![Twitter Follow](https://img.shields.io/twitter/follow/quilkindev?style=social)](https://twitter.com/quilkindev) -Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game servers -deployments, to ensure security, access control, telemetry data, metrics and more. - -It is designed to be used behind game clients as well as in front of dedicated game servers. +Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game +servers deployments that ensures security, access control, telemetry data, metrics and more without the end user +having to custom build and integrate this functionality into their game clients and servers directly. ## Announcements