diff --git a/Cargo.toml b/Cargo.toml index 2788028944..c00d5cb49d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ test = true quilkin-macros = { version = "0.3.0-dev", path = "./macros" } # Crates.io -backoff = "0.3.0" base64 = "0.13.0" base64-serde = "0.6.1" bytes = "1.1.0" @@ -68,6 +67,7 @@ tokio-stream = "0.1.7" tonic = "0.5.2" uuid = { version = "0.8.2", default-features = false, features = ["v4"] } thiserror = "1.0.30" +tryhard = "0.4.0" eyre = "0.6.5" stable-eyre = "0.2.2" ipnetwork = "0.18.0" diff --git a/src/filters/manager.rs b/src/filters/manager.rs index 6f72279c90..4be472b69b 100644 --- a/src/filters/manager.rs +++ b/src/filters/manager.rs @@ -33,6 +33,7 @@ pub struct FilterManager { } /// ListenerManagerArgs contains arguments when invoking the LDS resource manager. +#[derive(Clone)] pub(crate) struct ListenerManagerArgs { pub filter_chain_updates_tx: mpsc::Sender>, pub filter_registry: FilterRegistry, diff --git a/src/proxy/server/resource_manager.rs b/src/proxy/server/resource_manager.rs index ec9c969861..97491c708d 100644 --- a/src/proxy/server/resource_manager.rs +++ b/src/proxy/server/resource_manager.rs @@ -22,7 +22,7 @@ use crate::{ manager::{FilterManager, ListenerManagerArgs, SharedFilterManager}, FilterChain, FilterRegistry, }, - xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult, UPDATES_CHANNEL_BUFFER_SIZE}, + xds::ads_client::{AdsClient, ClusterUpdate, UPDATES_CHANNEL_BUFFER_SIZE}, }; use prometheus::Registry; use slog::{o, warn, Logger}; @@ -39,7 +39,7 @@ pub(super) struct StaticResourceManagers { pub(super) struct DynamicResourceManagers { pub(super) cluster_manager: SharedClusterManager, pub(super) filter_manager: SharedFilterManager, - pub(super) execution_result_rx: oneshot::Receiver, + pub(super) execution_result_rx: oneshot::Receiver>, } impl StaticResourceManagers { @@ -64,7 +64,7 @@ struct SpawnAdsClient { management_servers: Vec, cluster_updates_tx: mpsc::Sender, listener_manager_args: ListenerManagerArgs, - execution_result_tx: oneshot::Sender, + execution_result_tx: oneshot::Sender>, shutdown_rx: watch::Receiver<()>, } @@ -89,7 +89,7 @@ impl DynamicResourceManagers { filter_chain_updates_tx, ); - let (execution_result_tx, execution_result_rx) = oneshot::channel::(); + let (execution_result_tx, execution_result_rx) = oneshot::channel::>(); Self::spawn_ads_client(SpawnAdsClient { log: log.clone(), metrics_registry: metrics_registry.clone(), diff --git a/src/xds/ads_client.rs b/src/xds/ads_client.rs index 53190763c7..98cba36b34 100644 --- a/src/xds/ads_client.rs +++ b/src/xds/ads_client.rs @@ -16,12 +16,13 @@ use std::collections::HashMap; -use crate::xds::google::rpc::Status as GrpcStatus; -use backoff::{backoff::Backoff, exponential::ExponentialBackoff, Clock, SystemClock}; use prometheus::{Registry, Result as MetricsResult}; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ - sync::{mpsc, watch}, + sync::{ + mpsc::{self, error::SendError}, + watch, + }, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; @@ -29,84 +30,35 @@ use tonic::{ transport::{channel::Channel as TonicChannel, Error as TonicError}, Request, }; - -use crate::cluster::Cluster; -use crate::config::ManagementServer; -use crate::filters::manager::ListenerManagerArgs; -use crate::xds::cluster::ClusterManager; -use crate::xds::envoy::config::core::v3::Node; -use crate::xds::envoy::service::discovery::v3::{ - aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, DiscoveryRequest, +use tryhard::{ + backoff_strategies::{BackoffStrategy, ExponentialBackoff}, + RetryFutureConfig, RetryPolicy, }; -use crate::xds::listener::ListenerManager; -use crate::xds::metrics::Metrics; -use crate::xds::{CLUSTER_TYPE, ENDPOINT_TYPE, LISTENER_TYPE}; -use prometheus::core::{AtomicU64, GenericGauge}; -use tokio::sync::mpsc::error::SendError; - -/// AdsClient is a client that can talk to an XDS server using the ADS protocol. -pub(crate) struct AdsClient { - log: Logger, - metrics: Metrics, -} - -/// Contains the components that handle XDS responses for supported resources. -struct ResourceHandlers { - cluster_manager: ClusterManager, - listener_manager: ListenerManager, -} - -impl ResourceHandlers { - // Clear any stale state before (re)connecting. - pub fn on_reconnect(&mut self) { - self.cluster_manager.on_reconnect(); - } -} - -/// Represents the required arguments to start an rpc session with a server. -struct RpcSessionArgs<'a> { - log: Logger, - metrics: Metrics, - server_addr: String, - node_id: String, - resource_handlers: ResourceHandlers, - backoff: ExponentialBackoff, - discovery_req_rx: &'a mut mpsc::Receiver, - shutdown_rx: watch::Receiver<()>, -} - -enum RpcSessionError { - InitialConnect( - ResourceHandlers, - ExponentialBackoff, - TonicError, - ), - Receive( - ResourceHandlers, - ExponentialBackoff, - tonic::Status, - ), - NonRecoverable(&'static str, Box), -} -/// Represents the outcome of an rpc session with a server. -/// We return the resource handlers back so that they can be reused -/// without running into any lifetime issues. -type RpcSessionResult = Result; - -/// Represents an error encountered during a client execution. -#[derive(Debug)] -pub enum ExecutionError { - BackoffLimitExceeded, - Message(String), -} +use crate::{ + cluster::Cluster, + config::ManagementServer, + filters::manager::ListenerManagerArgs, + xds::{ + cluster::ClusterManager, + envoy::{ + config::core::v3::Node, + service::discovery::v3::{ + aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, + DiscoveryRequest, DiscoveryResponse, + }, + }, + google::rpc::Status as GrpcStatus, + listener::ListenerManager, + metrics::Metrics, + CLUSTER_TYPE, ENDPOINT_TYPE, LISTENER_TYPE, + }, + Result, +}; /// Represents a full snapshot the all clusters. pub type ClusterUpdate = HashMap; -/// Represents the result of a client execution. -pub type ExecutionResult = Result<(), ExecutionError>; - /// Use a bounded channel of size 1 on the channels between /// - the xds listeners and their associated resource manager. /// - the xds listeners and the xds server. @@ -116,12 +68,19 @@ pub type ExecutionResult = Result<(), ExecutionError>; /// so in that time we don't request more updates from the server) pub const UPDATES_CHANNEL_BUFFER_SIZE: usize = 1; +/// AdsClient is a client that can talk to an XDS server using the ADS protocol. +pub(crate) struct AdsClient { + log: Logger, + metrics: Metrics, +} + impl AdsClient { pub fn new(base_logger: Logger, metrics_registry: &Registry) -> MetricsResult { let log = base_logger.new(o!("source" => "xds::AdsClient")); let metrics = Metrics::new(metrics_registry)?; Ok(Self { log, metrics }) } + /// Continuously tracks CDS and EDS resources on an ADS server, /// sending summarized cluster updates on the provided channel. pub async fn run( @@ -131,145 +90,193 @@ impl AdsClient { cluster_updates_tx: mpsc::Sender, listener_manager_args: ListenerManagerArgs, mut shutdown_rx: watch::Receiver<()>, - ) -> ExecutionResult { - let mut backoff = ExponentialBackoff:: { - // If we hit connectivity issues, always backoff and retry. - max_elapsed_time: None, - ..Default::default() - }; - + ) -> Result<()> { let log = self.log; let metrics = self.metrics; - let (discovery_req_tx, mut discovery_req_rx) = - mpsc::channel::(UPDATES_CHANNEL_BUFFER_SIZE); - let cluster_manager = - ClusterManager::new(log.clone(), cluster_updates_tx, discovery_req_tx.clone()); - let listener_manager = - ListenerManager::new(log.clone(), listener_manager_args, discovery_req_tx); + let mut server_iter = management_servers.iter().cycle(); - let mut resource_handlers = ResourceHandlers { - cluster_manager, - listener_manager, - }; + let retry_config = RetryFutureConfig::new(u32::MAX).custom_backoff(|attempt, error: &_| { + let mut backoff = ExponentialBackoff::new(std::time::Duration::from_millis(500)); - // Run the client in a loop. - // If the connection fails, we retry (with another server if available). - let mut next_server_index = 0; - loop { - // Clear any stale state before (re)connecting. - resource_handlers.on_reconnect(); - - // Pick a server to talk to. - let server_addr = { - let server_addr = management_servers - .get(next_server_index % management_servers.len()) - .map(|server| server.address.clone()) - // We have previously validated that a config provides at least one - // server address so this default value shouldn't be necessary. - .unwrap_or_else(|| "127.0.0.1:18000".into()); - next_server_index += 1; - server_addr + match error { + RpcSessionError::NonRecoverable(msg, err) => { + error!(log, "{}\n{}", msg, err); + RetryPolicy::Break + } + + RpcSessionError::InitialConnect(ref err) => { + error!(log, "Unable to connect to the XDS server"; "error" => %err); + + // Do not retry if this is an invalid URL error that we cannot recover from. + // Need to use {:?} as the Display output only returns 'transport error' + let err_description = format!("{:?}", err); + if err_description.to_lowercase().contains("invalid url") { + RetryPolicy::Break + } else { + RetryPolicy::Delay(backoff.delay(attempt, &error)) + } + } + + RpcSessionError::Receive(ref status) => { + error!(log, "Failed to receive response from XDS server"; "status" => #?status); + RetryPolicy::Delay(backoff.delay(attempt, &error)) + } + } + }); + + let session_shutdown_rx = shutdown_rx.clone(); + let handle = tryhard::retry_fn(|| { + let (discovery_req_tx, discovery_req_rx) = + mpsc::channel::(UPDATES_CHANNEL_BUFFER_SIZE); + let cluster_manager = ClusterManager::new( + log.clone(), + cluster_updates_tx.clone(), + discovery_req_tx.clone(), + ); + let listener_manager = + ListenerManager::new(log.clone(), listener_manager_args.clone(), discovery_req_tx); + + let resource_handlers = ResourceHandlers { + cluster_manager, + listener_manager, }; - let args = RpcSessionArgs { + RpcSession { + discovery_req_rx, log: log.clone(), metrics: metrics.clone(), - server_addr: server_addr.clone(), node_id: node_id.clone(), + // server_iter is guaranteed to always have at least one entry. + addr: server_iter + .next() + .map(|server| server.address.to_string()) + .unwrap(), resource_handlers, - backoff, - discovery_req_rx: &mut discovery_req_rx, - shutdown_rx: shutdown_rx.clone(), + shutdown_rx: session_shutdown_rx.clone(), + } + .run() + }) + .with_config(retry_config); + + tokio::select! { + result = handle => result.map(drop).map_err(|error| eyre::eyre!(error)), + _ = shutdown_rx.changed() => { + info!(log, "Stopping client execution - received shutdown signal."); + Ok(()) + }, + } + } +} + +/// Represents the receiving side of the RPC channel. +pub struct RpcReceiver { + client: AggregatedDiscoveryServiceClient, + log: Logger, + metrics: Metrics, + resource_handlers: ResourceHandlers, + rpc_rx: mpsc::Receiver, + shutdown_rx: watch::Receiver<()>, +} + +impl RpcReceiver { + /// Spawns the task that runs a receive loop. + fn run(mut self) -> JoinHandle> { + tokio::spawn(async move { + let mut response_stream = match self + .client + .stream_aggregated_resources(Request::new(ReceiverStream::new(self.rpc_rx))) + .await + { + Ok(response) => response.into_inner(), + Err(err) => return Err(RpcSessionError::Receive(err)), }; - tokio::select! { - result = Self::run_rpc_session(args) => { - match result { - Ok(_) => return Ok(()), - Err(RpcSessionError::NonRecoverable(msg, err)) => { - error!(log, "{}", msg); - return Err(ExecutionError::Message(format!("{:?}", err))); - } - Err(RpcSessionError::InitialConnect(handlers, bk_off, err)) => { - resource_handlers = handlers; - backoff = bk_off; - - // Do not retry if this is an invalid URL error that we cannot recover from. - // Need to use {:?} as the Display output only returns 'transport error' - let err_description = format!("{:?}", err); - if err_description.to_lowercase().contains("invalid url") { - return Err(ExecutionError::Message(err_description)); - } - error!(log, "Unable to connect to the XDS server"; "address" => server_addr, "error" => %err); - Self::backoff( - &log, - &mut backoff - ).await?; - } - Err(RpcSessionError::Receive(handlers, bk_off, status)) => { - resource_handlers = handlers; - backoff = bk_off; - error!(log, "Failed to receive response from XDS server"; "address" => server_addr, "status" => #?status); - Self::backoff( - &log, - &mut backoff - ).await?; + // We are now connected to the server. + self.metrics.connected_state.set(1); + + let result = loop { + tokio::select! { + response = response_stream.message() => { + let response = match response { + Ok(None) => { + // No more messages on the connection. + info!(self.log, "Exiting receive loop - response stream closed."); + break Ok(()) + }, + Err(err) => break Err(RpcSessionError::Receive(err)), + Ok(Some(response)) => response + }; + + self.metrics.update_attempt_total.inc(); + if let Err(url) = self.resource_handlers.handle_discovery_response(response).await { + self.metrics.update_failure_total.inc(); + error!(self.log, "Unexpected resource"; "type" => url); } } - }, - _ = shutdown_rx.changed() => { - info!(log, "Stopping client execution - received shutdown signal."); - return Ok(()) - }, - } - } + _ = self.shutdown_rx.changed() => { + info!(self.log, "Exiting receive loop - received shutdown signal"); + break Ok(()) + } + } + }; + + // We are no longer connected. + self.metrics.connected_state.set(0); + + result + }) } +} +/// Represents a complete aDS gRPC session. +pub struct RpcSession { + discovery_req_rx: mpsc::Receiver, + log: Logger, + metrics: Metrics, + node_id: String, + addr: String, + resource_handlers: ResourceHandlers, + shutdown_rx: watch::Receiver<()>, +} + +impl RpcSession { /// Executes an RPC session with a server. /// A session consists of two concurrent rpc loops executing the XDS protocol /// together with a ClusterManager. One loop (receive loop) receives /// responses from the server, forwarding them to the ClusterManager /// while the other loop (send loop) waits for DiscoveryRequest ACKS/NACKS /// from the ClusterManager, forwarding them to the server. - async fn run_rpc_session(args: RpcSessionArgs<'_>) -> RpcSessionResult { - let RpcSessionArgs { - log, - metrics, - server_addr, - node_id, - resource_handlers, - backoff, - discovery_req_rx, - shutdown_rx, - } = args; - let client = match AggregatedDiscoveryServiceClient::connect(server_addr).await { + async fn run(mut self) -> Result<(), RpcSessionError> { + let client = match AggregatedDiscoveryServiceClient::connect(self.addr).await { Ok(client) => client, - Err(err) => { - return Err(RpcSessionError::InitialConnect( - resource_handlers, - backoff, - err, - )) - } + Err(err) => return Err(RpcSessionError::InitialConnect(err)), }; - let (mut rpc_tx, rpc_rx) = mpsc::channel::(UPDATES_CHANNEL_BUFFER_SIZE); + let (rpc_tx, rpc_rx) = mpsc::channel::(UPDATES_CHANNEL_BUFFER_SIZE); // Spawn a task that runs the receive loop. - let mut recv_loop_join_handle = Self::run_receive_loop( - log.clone(), - metrics.clone(), + let mut recv_loop_join_handle = RpcReceiver { client, + log: self.log.clone(), + metrics: self.metrics.clone(), + resource_handlers: self.resource_handlers, rpc_rx, - resource_handlers, - backoff, - shutdown_rx, - ); + shutdown_rx: self.shutdown_rx, + } + .run(); + + let sender = RpcSender { + log: self.log.clone(), + metrics: self.metrics.clone(), + rpc_tx, + }; // Fetch the initial set of resources. - Self::send_initial_cds_and_lds_request(&log, &metrics, node_id, &mut rpc_tx).await?; + sender + .send_initial_cds_and_lds_request(self.node_id) + .await?; // Run the send loop on the current task. loop { @@ -281,16 +288,16 @@ impl AdsClient { Err(RpcSessionError::NonRecoverable( "receive loop encountered an error", Box::new(err)))), - req = discovery_req_rx.recv() => { + req = self.discovery_req_rx.recv() => { if let Some(req) = req { - Self::send_discovery_request(&log, &metrics, req, &mut rpc_tx) + sender.send_discovery_request(req) .await .map_err(|err| RpcSessionError::NonRecoverable( "failed to send discovery request on channel", Box::new(err)) )?; } else { - info!(log, "Exiting send loop"); + info!(self.log, "Exiting send loop"); break; } } @@ -306,41 +313,33 @@ impl AdsClient { )) }) } +} - #[allow(deprecated)] +struct RpcSender { + log: Logger, + metrics: Metrics, + rpc_tx: mpsc::Sender, +} + +impl RpcSender { async fn send_initial_cds_and_lds_request( - log: &Logger, - metrics: &Metrics, + &self, node_id: String, - rpc_tx: &mut mpsc::Sender, ) -> Result<(), RpcSessionError> { for resource_type in &[CLUSTER_TYPE, LISTENER_TYPE] { - let send_result = Self::send_discovery_request( - log, - metrics, - DiscoveryRequest { - version_info: "".into(), + let send_result = self + .send_discovery_request(DiscoveryRequest { node: Some(Node { id: node_id.clone(), - cluster: "".into(), - metadata: None, - dynamic_parameters: Default::default(), - locality: None, user_agent_name: "quilkin".into(), - extensions: vec![], - client_features: vec![], - listening_addresses: vec![], - user_agent_version_type: None, + ..Node::default() }), resource_names: vec![], // Wildcard mode. type_url: (*resource_type).into(), - response_nonce: "".into(), - error_detail: None, - }, - rpc_tx, - ) - .await - .map_err(|err| + ..DiscoveryRequest::default() + }) + .await + .map_err(|err| // An error sending means we have no listener on the other side which // would likely be a bug if we're not already shutting down. RpcSessionError::NonRecoverable( @@ -356,115 +355,65 @@ impl AdsClient { Ok(()) } - // Spawns a task that runs a receive loop. - fn run_receive_loop( - log: Logger, - metrics: Metrics, - mut client: AggregatedDiscoveryServiceClient, - rpc_rx: mpsc::Receiver, - mut resource_handlers: ResourceHandlers, - mut backoff: ExponentialBackoff, - mut shutdown_rx: watch::Receiver<()>, - ) -> JoinHandle { - tokio::spawn(async move { - let mut response_stream = match client - .stream_aggregated_resources(Request::new(ReceiverStream::new(rpc_rx))) - .await - { - Ok(response) => response.into_inner(), - Err(err) => return Err(RpcSessionError::Receive(resource_handlers, backoff, err)), - }; - - // This updates metrics for connection state. It updates the metric to 1 upon - // creation and back to 0 once it goes out of scope (i.e when the function returns, - // we are no longer connected since the client must have been dropped as well). - struct ConnectionState(GenericGauge); - impl ConnectionState { - fn connected(metric: GenericGauge) -> Self { - metric.set(1); - Self(metric) - } - } - impl Drop for ConnectionState { - fn drop(&mut self) { - self.0.set(0); - } - } - - // We are now connected to the server. - let _connected_state = ConnectionState::connected(metrics.connected_state); - - loop { - tokio::select! { - response = response_stream.message() => { - let response = match response { - Ok(None) => { - // No more messages on the connection. - info!(log, "Exiting receive loop - response stream closed."); - return Ok(resource_handlers) - }, - Err(err) => return Err(RpcSessionError::Receive(resource_handlers, backoff, err)), - Ok(Some(response)) => response - }; - - // Reset backoff timer if needed, now that we have - // successfully reached the server. - backoff.reset(); - - metrics.update_attempt_total.inc(); - if response.type_url == CLUSTER_TYPE { - resource_handlers.cluster_manager.on_cluster_response(response).await; - } else if response.type_url == ENDPOINT_TYPE { - resource_handlers.cluster_manager.on_cluster_load_assignment_response(response).await; - } else if response.type_url == LISTENER_TYPE { - resource_handlers.listener_manager.on_listener_response(response).await; - } else { - metrics.update_failure_total.inc(); - error!(log, "Unexpected resource"; "type" => response.type_url); - } - } - - _ = shutdown_rx.changed() => { - info!(log, "Exiting receive loop - received shutdown signal"); - return Ok(resource_handlers) - } - } - } - }) - } - async fn send_discovery_request( - log: &Logger, - metrics: &Metrics, + &self, req: DiscoveryRequest, - req_tx: &mut mpsc::Sender, ) -> Result<(), SendError> { if req.error_detail.is_some() { - metrics.update_failure_total.inc(); + self.metrics.update_failure_total.inc(); } else { - metrics.update_success_total.inc(); + self.metrics.update_success_total.inc(); } - metrics.requests_total.inc(); - debug!(log, "Sending rpc discovery"; "request" => #?req); + self.metrics.requests_total.inc(); - req_tx.send(req).await + debug!(self.log, "Sending rpc discovery"; "request" => #?req); + + self.rpc_tx.send(req).await } +} + +/// Contains the components that handle xDS responses for supported resources. +struct ResourceHandlers { + cluster_manager: ClusterManager, + listener_manager: ListenerManager, +} + +impl ResourceHandlers { + /// Checks if the discovery response matches any well known types, if none + /// match then it will return an `Err` containing the URL of the type + /// not recognised. + async fn handle_discovery_response( + &mut self, + response: DiscoveryResponse, + ) -> Result<(), String> { + match &*response.type_url { + CLUSTER_TYPE => self.cluster_manager.on_cluster_response(response).await, + ENDPOINT_TYPE => { + self.cluster_manager + .on_cluster_load_assignment_response(response) + .await + } + LISTENER_TYPE => self.listener_manager.on_listener_response(response).await, + _ => return Err(response.type_url), + } - async fn backoff( - log: &Logger, - backoff: &mut ExponentialBackoff, - ) -> Result<(), ExecutionError> { - let delay = backoff.next_backoff().ok_or_else(|| { - warn!(log, "Backoff limit exceeded"); - ExecutionError::BackoffLimitExceeded - })?; - info!(log, "Retrying"; "delay" => #?delay); - tokio::time::sleep(delay).await; Ok(()) } } +#[derive(Debug, thiserror::Error)] +enum RpcSessionError { + #[error("Failed to establish initial connection.\n {0:?}")] + InitialConnect(TonicError), + + #[error("Error occured while receiving data. Status: {0}")] + Receive(tonic::Status), + + #[error("Non-recoverable aDS error:\nname: {0}\n{1}")] + NonRecoverable(&'static str, Box), +} + // Send a Discovery request with the provided arguments on the channel. pub(super) async fn send_discovery_req( log: Logger, diff --git a/src/xds/cluster.rs b/src/xds/cluster.rs index c7329be3a1..1f9e3f6905 100644 --- a/src/xds/cluster.rs +++ b/src/xds/cluster.rs @@ -304,13 +304,6 @@ impl ClusterManager { Ok(existing_endpoints) } - // Notify that we are about to reconnect the GRPC stream. - pub(in crate::xds) fn on_reconnect(&mut self) { - // Reset any last seen version and nonce since we'll be working - // with a new connection from now on with a clean slate. - self.last_seen_cluster_load_assignment_version = None - } - // Send a CDS ACK/NACK request to the server. async fn send_cluster_discovery_req( &mut self,