From ad81cc488dc42c267ebf209d659406813cb3a7a8 Mon Sep 17 00:00:00 2001 From: Erin Power Date: Fri, 12 May 2023 10:20:15 +0200 Subject: [PATCH 1/3] Remove shutdown_rx from downstream loop --- src/cli/proxy.rs | 20 ++++---------------- src/proxy.rs | 37 ++++++++++++++----------------------- 2 files changed, 18 insertions(+), 39 deletions(-) diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 739dd904c5..4b22d7a196 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -14,9 +14,8 @@ * limitations under the License. */ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time::Duration}; use tonic::transport::Endpoint; use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result}; @@ -115,7 +114,7 @@ impl Proxy { None }; - self.run_recv_from(&config, sessions.clone(), shutdown_rx.clone())?; + self.run_recv_from(&config, sessions.clone())?; tracing::info!("Quilkin is ready"); shutdown_rx @@ -138,12 +137,7 @@ impl Proxy { /// This function also spawns the set of worker tasks responsible for consuming packets /// off the aforementioned queue and processing them through the filter chain and session /// pipeline. - fn run_recv_from( - &self, - config: &Arc, - sessions: SessionMap, - shutdown_rx: watch::Receiver<()>, - ) -> Result<()> { + fn run_recv_from(&self, config: &Arc, sessions: SessionMap) -> Result<()> { // The number of worker tasks to spawn. Each task gets a dedicated queue to // consume packets off. let num_workers = num_cpus::get(); @@ -155,7 +149,6 @@ impl Proxy { workers.push(crate::proxy::DownstreamReceiveWorkerConfig { worker_id, socket: socket.clone(), - shutdown_rx: shutdown_rx.clone(), config: config.clone(), sessions: sessions.clone(), }) @@ -315,7 +308,6 @@ mod tests { let socket = Arc::new(create_socket().await); let addr = socket.local_addr().unwrap(); - let (_shutdown_tx, shutdown_rx) = watch::channel(()); let endpoint = t.open_socket_and_recv_single_packet().await; let msg = "hello"; let config = Arc::new(Config::default()); @@ -329,7 +321,6 @@ mod tests { socket: socket.clone(), config, sessions: <_>::default(), - shutdown_rx, } .spawn(); @@ -348,7 +339,6 @@ mod tests { #[tokio::test] async fn run_recv_from() { let t = TestHelper::default(); - let (_shutdown_tx, shutdown_rx) = watch::channel(()); let msg = "hello"; let endpoint = t.open_socket_and_recv_single_packet().await; @@ -363,9 +353,7 @@ mod tests { clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) }); - proxy - .run_recv_from(&config, <_>::default(), shutdown_rx) - .unwrap(); + proxy.run_recv_from(&config, <_>::default()).unwrap(); let socket = create_socket().await; socket.send_to(msg.as_bytes(), &local_addr).await.unwrap(); diff --git a/src/proxy.rs b/src/proxy.rs index c6c44e9fbd..a45e97fc33 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -18,7 +18,7 @@ mod sessions; use std::sync::Arc; -use tokio::{net::UdpSocket, sync::watch}; +use tokio::net::UdpSocket; use crate::{ endpoint::{Endpoint, EndpointAddress}, @@ -47,8 +47,6 @@ pub(crate) struct DownstreamReceiveWorkerConfig { pub socket: Arc, pub config: Arc, pub sessions: SessionMap, - /// The worker task exits when a value is received from this shutdown channel. - pub shutdown_rx: watch::Receiver<()>, } impl DownstreamReceiveWorkerConfig { @@ -58,7 +56,6 @@ impl DownstreamReceiveWorkerConfig { socket, config, sessions, - mut shutdown_rx, } = self; tokio::spawn(async move { @@ -71,26 +68,20 @@ impl DownstreamReceiveWorkerConfig { addr = ?socket.local_addr(), "Awaiting packet" ); - tokio::select! { - result = socket.recv_from(&mut buf) => { - match result { - Ok((size, source)) => { - let packet = DownstreamPacket { - received_at: chrono::Utc::now().timestamp_nanos(), - source: source.into(), - contents: buf[..size].to_vec(), - }; - - Self::spawn_process_task(packet, source, worker_id, &socket, &config, &sessions) - } - Err(error) => { - tracing::error!(%error, "error receiving packet"); - return; - } - } + match socket.recv_from(&mut buf).await { + Ok((size, source)) => { + let packet = DownstreamPacket { + received_at: chrono::Utc::now().timestamp_nanos(), + source: source.into(), + contents: buf[..size].to_vec(), + }; + + Self::spawn_process_task( + packet, source, worker_id, &socket, &config, &sessions, + ) } - _ = shutdown_rx.changed() => { - tracing::debug!(id = worker_id, "Received shutdown signal"); + Err(error) => { + tracing::error!(%error, "error receiving packet"); return; } } From 4222eed990c5c627228bcfa7bfed312cfeaf6e4e Mon Sep 17 00:00:00 2001 From: Erin Power Date: Tue, 23 May 2023 11:12:22 +0200 Subject: [PATCH 2/3] Improve missing gameserver warning --- src/config/providers/k8s.rs | 12 ++--- src/config/providers/k8s/agones.rs | 86 +++++++++++++++++------------- 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index 599eba90b7..a8889cfd53 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -145,12 +145,7 @@ pub fn update_endpoints_from_gameservers( } Event::Deleted(server) => { - let found = if let Some(status) = &server.status { - let port = status.ports.as_ref() - .and_then(|ports| ports.first().map(|status| status.port)) - .unwrap_or_default(); - - let endpoint = Endpoint::from((status.address.clone(), port)); + let found = if let Some(endpoint) = server.endpoint() { config.clusters.value().remove_endpoint(&endpoint) } else { config.clusters.value().remove_endpoint_if(|endpoint| { @@ -159,7 +154,10 @@ pub fn update_endpoints_from_gameservers( }; if found.is_none() { - tracing::warn!(?server, "received unknown gameserver to delete from k8s"); + tracing::warn!( + endpoint=%serde_json::to_value(server.endpoint()).unwrap(), + "received unknown gameserver to delete from k8s" + ); } } }; diff --git a/src/config/providers/k8s/agones.rs b/src/config/providers/k8s/agones.rs index 6112a20095..086df423fa 100644 --- a/src/config/providers/k8s/agones.rs +++ b/src/config/providers/k8s/agones.rs @@ -39,6 +39,53 @@ pub struct GameServer { pub status: Option, } +impl GameServer { + pub fn endpoint(&self) -> Option { + self.status.as_ref().map(|status| { + let port = status + .ports + .as_ref() + .and_then(|ports| ports.first().map(|status| status.port)) + .unwrap_or_default(); + + let tokens = self.tokens(); + let extra_metadata = { + let mut map = serde_json::Map::default(); + map.insert( + "name".into(), + self.metadata.name.clone().unwrap_or_default().into(), + ); + map + }; + + Endpoint::with_metadata( + (status.address.clone(), port).into(), + crate::metadata::MetadataView::with_unknown( + crate::endpoint::Metadata { tokens }, + extra_metadata, + ), + ) + }) + } + + pub fn tokens(&self) -> std::collections::BTreeSet> { + match self.metadata.annotations.as_ref() { + Some(annotations) => annotations + .get(QUILKIN_TOKEN_LABEL) + .map(|value| { + value + .split(',') + .map(String::from) + .map(base64::decode) + .filter_map(Result::ok) + .collect() + }) + .unwrap_or_default(), + None => <_>::default(), + } + } +} + #[derive(Clone, Debug, Deserialize, schemars::JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Inner { @@ -246,42 +293,9 @@ impl TryFrom for Endpoint { type Error = tonic::Status; fn try_from(server: GameServer) -> Result { - let status = server - .status - .as_ref() - .ok_or_else(|| tonic::Status::internal("No status found for game server"))?; - let mut extra_metadata = serde_json::Map::default(); - extra_metadata.insert( - "name".into(), - server.metadata.name.clone().unwrap_or_default().into(), - ); - - let tokens = match server.metadata.annotations.as_ref() { - Some(annotations) => annotations - .get(QUILKIN_TOKEN_LABEL) - .map(|value| { - value - .split(',') - .map(String::from) - .map(crate::utils::base64_decode) - .filter_map(Result::ok) - .collect::>() - }) - .unwrap_or_default(), - None => <_>::default(), - }; - - let address = status.address.clone(); - let port = status - .ports - .as_ref() - .and_then(|ports| ports.first().map(|status| status.port)) - .unwrap_or_default(); - let filter_metadata = crate::endpoint::Metadata { tokens }; - Ok(Self::with_metadata( - (address, port).into(), - crate::metadata::MetadataView::with_unknown(filter_metadata, extra_metadata), - )) + server + .endpoint() + .ok_or_else(|| tonic::Status::internal("No status found for game server")) } } From f282ad29336a0e52f42b6390f5122e95f4a028a5 Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Tue, 23 May 2023 11:53:54 +0200 Subject: [PATCH 3/3] Add name as property in warning --- src/config/providers/k8s.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index a8889cfd53..353391ea64 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -156,6 +156,7 @@ pub fn update_endpoints_from_gameservers( if found.is_none() { tracing::warn!( endpoint=%serde_json::to_value(server.endpoint()).unwrap(), + name=%serde_json::to_value(server.metadata.name).unwrap(), "received unknown gameserver to delete from k8s" ); }