From 526de448fdd9bae62ca8e443d70fcea7bbb9da7e Mon Sep 17 00:00:00 2001 From: ifeanyi Date: Thu, 3 Dec 2020 21:19:44 +0100 Subject: [PATCH 1/4] Add dynamic proxy configuration Integrates the XDS client into the cluster manager Work on #10 --- docs/proxy.md | 15 +++ examples/control-plane.yaml | 29 +++++ src/cluster/cluster_manager.rs | 91 ++++++++------- src/cluster/mod.rs | 9 +- src/config/error.rs | 24 +++- src/config/mod.rs | 202 +++++++++++++++++++++++++++++---- src/proxy/builder.rs | 3 + src/proxy/server/error.rs | 2 + src/proxy/server/metrics.rs | 42 +++++++ src/proxy/server/mod.rs | 153 ++++++++++++++++--------- src/xds/ads_client.rs | 9 +- 11 files changed, 451 insertions(+), 128 deletions(-) create mode 100644 docs/proxy.md create mode 100644 examples/control-plane.yaml create mode 100644 src/proxy/server/metrics.rs diff --git a/docs/proxy.md b/docs/proxy.md new file mode 100644 index 0000000000..9c8672390a --- /dev/null +++ b/docs/proxy.md @@ -0,0 +1,15 @@ +### Proxy + + +#### Metrics + +The proxy exposes the following core metrics: + +- `quilkin_proxy_packets_dropped_total{reason}` (Counter) + + The total number of packets (not associated with any session) that were dropped by proxy. + Not that packets reflected by this metric were dropped at an earlier stage before they were associated with any session. For session based metrics, see the list of [session metrics][session-metrics] instead. + * `reason = NoConfiguredEndpoints` + - `NoConfiguredEndpoints`: No upstream endpoints were available to send the packet to. This can occur e.g if the endpoints cluster was scaled down to zero and the proxy is configured via a control plane. + +[session-metrics]: ./session.md diff --git a/examples/control-plane.yaml b/examples/control-plane.yaml new file mode 100644 index 0000000000..b6bd8d0428 --- /dev/null +++ b/examples/control-plane.yaml @@ -0,0 +1,29 @@ +# +# Copyright 2020 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Example configuration for a Quilkin Proxy that is configured via a control plane. +# + +version: v1alpha1 +proxy: + mode: SERVER # Run the proxy in server mode. + id: my-proxy # An identifier for the proxy instance. + port: 7001 # the port to receive traffic to locally +dynamic: # Provide configuration of endpoints using an XDS management server + management_servers: # array of management servers to configure the proxy with. + # Multiple servers can be provided for redundancy. + - address: 127.0.0.1:26000 diff --git a/src/cluster/cluster_manager.rs b/src/cluster/cluster_manager.rs index 68e9577eff..ce5f61b3e0 100644 --- a/src/cluster/cluster_manager.rs +++ b/src/cluster/cluster_manager.rs @@ -21,19 +21,20 @@ // and we will need to acquire a read lock with every packet that is processed // to be able to capture the current endpoint state and pass it to Filters. use parking_lot::RwLock; -use slog::{debug, warn, Logger}; +use slog::{debug, o, warn, Logger}; use std::collections::HashMap; use std::net::SocketAddr; use std::{fmt, sync::Arc}; use tokio::sync::{mpsc, oneshot, watch}; -use crate::config::{EmptyListError, EndPoint, Endpoints, UpstreamEndpoints}; +use crate::config::{EmptyListError, EndPoint, Endpoints, ManagementServer, UpstreamEndpoints}; use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult}; /// The max size of queue that provides updates from the XDS layer to the [`ClusterManager`]. const CLUSTER_UPDATE_QUEUE_SIZE: usize = 1000; -type Clusters = HashMap>; +type Clusters = HashMap; +pub type SharedClusterManager = Arc>; /// ClusterManager knows about all clusters and endpoints. pub struct ClusterManager { @@ -67,61 +68,53 @@ impl ClusterManager { /// Returns all endpoints known at the time of invocation. /// Returns `None` if there are no endpoints. pub fn get_all_endpoints(&self) -> Option { - let endpoints = self - .clusters + // NOTE: We don't currently have support for consuming multiple clusters + // so here we assume that there is only a single cluster - Return all + // endpoints for _any_ cluster. + self.clusters .iter() - .map(|(name, addresses)| { - addresses - .iter() - .map(move |addr| EndPoint::new(name.clone(), *addr, vec![])) - }) - .flatten() - .collect(); - - match Endpoints::new(endpoints) { - Ok(endpoints) => Some(endpoints.into()), - Err(EmptyListError) => None, - } + .next() + .map(|(cluster_name, endpoints)| endpoints.clone().into()) } /// Returns a ClusterManager backed by the fixed set of clusters provided in the config. - pub fn fixed(endpoints: &[(String, SocketAddr)]) -> ClusterManager { - Self::new( - endpoints - .iter() - .cloned() - .map(|(name, addr)| (name, vec![addr])) + pub fn fixed(endpoints: Vec) -> SharedClusterManager { + Arc::new(RwLock::new(Self::new( + Some("static-cluster".into()) + .into_iter() + .map(|cluster_name| { + ( + cluster_name, + Endpoints::new(endpoints.clone()) + .expect("endpoints list in config should be validated non-empty"), + ) + }) .collect(), - ) + ))) } /// Returns a ClusterManager backed by a set of XDS servers. /// This function starts an XDS client in the background that talks to /// one of the provided servers. - /// Multiple servers are provided for redundancy - the servers will be + /// Multiple management servers can be provided for redundancy - the servers will be /// connected to in turn only in the case of failure. /// The set of clusters is continuously updated based on responses /// from the XDS server. /// The returned contains the XDS client's execution result after termination. - async fn dynamic<'a>( - log: Logger, - server_addresses: Vec, - xds_node_id: Option, + pub async fn from_xds<'a>( + base_logger: Logger, + management_servers: Vec, + xds_node_id: String, mut shutdown_rx: watch::Receiver<()>, - ) -> Result< - ( - Arc>, - oneshot::Receiver, - ), - InitializeError, - > { + ) -> Result<(SharedClusterManager, oneshot::Receiver), InitializeError> { + let log = base_logger.new(o!("source" => "cluster::ClusterManager")); let (cluster_updates_tx, mut cluster_updates_rx) = mpsc::channel::(CLUSTER_UPDATE_QUEUE_SIZE); let (execution_result_tx, execution_result_rx) = oneshot::channel::(); Self::spawn_ads_client( log.clone(), - xds_node_id.unwrap_or_default(), - server_addresses, + xds_node_id, + management_servers, cluster_updates_tx, execution_result_tx, shutdown_rx.clone(), @@ -151,14 +144,24 @@ impl ClusterManager { fn create_clusters_from_update(update: ClusterUpdate) -> Clusters { update .into_iter() - .map(|(name, cluster)| { - let addresses = cluster + .filter_map(|(name, cluster)| { + let endpoints = cluster .localities .into_iter() - .map(|(_, endpoints)| endpoints.endpoints.into_iter().map(|ep| ep.address)) + .map(|(_, endpoints)| { + endpoints + .endpoints + .into_iter() + .map(|ep| EndPoint::new("N/A".into(), ep.address, vec![])) + }) .flatten() .collect::>(); - (name, addresses) + + // If we get an error it means that the list is empty, + // in which case we forget the cluster entirely in turn. + Endpoints::new(endpoints) + .ok() + .map(|endpoints| (name, endpoints)) }) .collect() } @@ -168,7 +171,7 @@ impl ClusterManager { fn spawn_ads_client( log: Logger, node_id: String, - server_addresses: Vec, + management_servers: Vec, cluster_updates_tx: mpsc::Sender, execution_result_tx: oneshot::Sender, shutdown_rx: watch::Receiver<()>, @@ -178,7 +181,7 @@ impl ClusterManager { .run( log.clone(), node_id, - server_addresses, + management_servers, cluster_updates_tx, shutdown_rx, ) diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index f02804b57f..2575411823 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -18,7 +18,14 @@ use std::collections::HashMap; use std::net::SocketAddr; #[cfg(not(doctest))] -mod cluster_manager; +pub(crate) mod cluster_manager; + +// Stub module to work-around not including cluster_manager in doc tests. +#[cfg(doctest)] +pub(crate) mod cluster_manager { + pub struct ClusterManager; + pub struct SharedClusterManager; +} #[derive(Clone, Debug, Eq, PartialEq)] pub struct Endpoint { diff --git a/src/config/error.rs b/src/config/error.rs index c076ea1986..888489280b 100644 --- a/src/config/error.rs +++ b/src/config/error.rs @@ -1,12 +1,19 @@ use crate::extensions::Error as FilterRegistryError; use std::fmt::{self, Display, Formatter}; +#[derive(Debug, PartialEq)] +pub struct ValueInvalidArgs { + pub field: String, + pub clarification: Option, + pub examples: Option>, +} + /// Validation failure for a Config #[derive(Debug, PartialEq)] pub enum ValidationError { NotUnique(String), EmptyList(String), - ValueInvalid(String, Option>), + ValueInvalid(ValueInvalidArgs), FilterInvalid(FilterRegistryError), } @@ -15,11 +22,18 @@ impl Display for ValidationError { match self { ValidationError::NotUnique(field) => write!(f, "field {} is not unique", field), ValidationError::EmptyList(field) => write!(f, "field {} is cannot be an empty", field), - ValidationError::ValueInvalid(field, examples) => write!( + ValidationError::ValueInvalid(args) => write!( f, - "{} has an invalid value {}", - field, - examples.as_ref().map(|v| v.join(",")).unwrap_or_default() + "{} has an invalid value{}{}", + args.field, + args.clarification + .as_ref() + .map(|v| format!(": {}", v)) + .unwrap_or_default(), + args.examples + .as_ref() + .map(|v| format!("examples: {}", v.join(","))) + .unwrap_or_default() ), ValidationError::FilterInvalid(reason) => { write!(f, "filter configuration is invalid: {}", reason) diff --git a/src/config/mod.rs b/src/config/mod.rs index 98296ca37f..bbdb363c76 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -21,6 +21,7 @@ use std::net::SocketAddr; use base64_serde::base64_serde_type; use serde::{Deserialize, Serialize}; +use tonic::transport::Endpoint as TonicEndpoint; use uuid::Uuid; mod builder; @@ -30,8 +31,10 @@ mod error; pub use crate::config::endpoints::{ EmptyListError, Endpoints, UpstreamEndpoints, UpstreamEndpointsIter, }; +use crate::config::error::ValueInvalidArgs; pub use builder::Builder; pub use error::ValidationError; +use std::convert::TryInto; base64_serde_type!(Base64Standard, base64::STANDARD); @@ -93,6 +96,11 @@ pub struct Admin { address: Option, } +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct ManagementServer { + pub address: String, +} + #[derive(Debug, Deserialize, Serialize)] pub enum Source { #[serde(rename = "static")] @@ -102,6 +110,13 @@ pub enum Source { endpoints: Vec, }, + #[serde(rename = "dynamic")] + Dynamic { + #[serde(default)] + filters: Vec, + + management_servers: Vec, + }, } /// Config is the configuration for either a Client or Server proxy @@ -130,19 +145,12 @@ impl Source { filters, endpoints: _, } => filters, + Source::Dynamic { + filters, + management_servers: _, + } => filters, } } - - pub fn get_endpoints(&self) -> Endpoints { - let endpoints = match &self { - Source::Static { - filters: _, - endpoints, - } => endpoints.clone(), - }; - - Endpoints::new(endpoints).expect("endpoints list in config should be validated non-empty") - } } /// Filter is the configuration for a single filter @@ -237,6 +245,45 @@ impl Source { )); } + Ok(()) + } + Source::Dynamic { + filters: _, + management_servers, + } => { + if management_servers.is_empty() { + return Err(ValidationError::EmptyList( + "dynamic.management_servers".to_string(), + )); + } + + if management_servers + .iter() + .map(|server| &server.address) + .collect::>() + .len() + != management_servers.len() + { + return Err(ValidationError::NotUnique( + "dynamic.management_servers.address".to_string(), + )); + } + + for server in management_servers { + let res: Result = server.address.clone().try_into(); + if res.is_err() { + return Err(ValidationError::ValueInvalid(ValueInvalidArgs { + field: "dynamic.management_servers.address".into(), + clarification: Some("the provided value must be a valid URI".into()), + examples: Some(vec![ + "http://127.0.0.1:8080".into(), + "127.0.0.1:8081".into(), + "example.com".into(), + ]), + })); + } + } + Ok(()) } } @@ -247,12 +294,38 @@ impl Source { mod tests { use serde_yaml::Value; - use crate::config::{Builder, Config, EndPoint, Endpoints, ProxyMode, ValidationError}; + use crate::config::{ + Builder, Config, EndPoint, ManagementServer, ProxyMode, Source, ValidationError, + }; fn parse_config(yaml: &str) -> Config { Config::from_reader(yaml.as_bytes()).unwrap() } + fn assert_static_endpoints(source: &Source, expected_endpoints: Vec) { + match source { + Source::Static { + filters: _, + endpoints, + } => { + assert_eq!(&expected_endpoints, endpoints,); + } + _ => unreachable!("expected static config source"), + } + } + + fn assert_management_servers(source: &Source, expected: Vec) { + match source { + Source::Dynamic { + filters: _, + management_servers, + } => { + assert_eq!(&expected, management_servers,); + } + _ => unreachable!("expected dynamic config source"), + } + } + #[test] fn deserialise_client() { let config = Builder::empty() @@ -385,14 +458,13 @@ static: let config = parse_config(yaml); assert_eq!(config.proxy.mode, ProxyMode::Client); - assert_eq!( - config.source.get_endpoints(), - Endpoints::new(vec![EndPoint::new( + assert_static_endpoints( + &config.source, + vec![EndPoint::new( "ep-1".into(), "127.0.0.1:25999".parse().unwrap(), - vec![] - )]) - .unwrap() + vec![], + )], ); } @@ -415,9 +487,9 @@ static: connection_ids: - bmt1eTcweA== #nkuy70x"; let config = parse_config(yaml); - assert_eq!( - config.source.get_endpoints(), - Endpoints::new(vec![ + assert_static_endpoints( + &config.source, + vec![ EndPoint::new( "Game Server No. 1".into(), "127.0.0.1:26000".parse().unwrap(), @@ -428,8 +500,92 @@ static: "127.0.0.1:26001".parse().unwrap(), vec!["nkuy70x".into()], ), - ]) - .unwrap() + ], + ); + } + + #[test] + fn parse_dynamic_source() { + let yaml = " +version: v1alpha1 +dynamic: + filters: + - name: quilkin.core.v1.rate-limiter + config: + map: of arbitrary key value pairs + could: + - also + - be + - 27 + - true + management_servers: + - address: 127.0.0.1:25999 + - address: 127.0.0.1:30000 + "; + let config = parse_config(yaml); + + let filter = config.source.get_filters().get(0).unwrap(); + assert_eq!("quilkin.core.v1.rate-limiter", filter.name); + let filter_config = filter.config.as_ref().unwrap().as_mapping().unwrap(); + + let key = Value::from("map"); + assert_eq!( + "of arbitrary key value pairs", + filter_config.get(&key).unwrap().as_str().unwrap() + ); + + assert_management_servers( + &config.source, + vec![ + ManagementServer { + address: "127.0.0.1:25999".into(), + }, + ManagementServer { + address: "127.0.0.1:30000".into(), + }, + ], + ); + } + + #[test] + fn validate_dynamic_source() { + let yaml = " +# Valid management address list. +version: v1alpha1 +dynamic: + management_servers: + - address: 127.0.0.1:25999 + - address: example.com + - address: http://127.0.0.1:30000 + "; + assert!(parse_config(yaml).validate().is_ok()); + + let yaml = " +# Invalid management address. +version: v1alpha1 +dynamic: + management_servers: + - address: 'not an endpoint address' + "; + match parse_config(yaml).validate().unwrap_err() { + ValidationError::ValueInvalid(args) => { + assert_eq!(args.field, "dynamic.management_servers.address".to_string()); + } + err => unreachable!("expected invalid value error: got {}", err), + } + + let yaml = " +# Duplicate management addresses. +version: v1alpha1 +dynamic: + management_servers: + - address: 127.0.0.1:25999 + - address: 127.0.0.1:25999 + "; + assert_eq!( + ValidationError::NotUnique("dynamic.management_servers.address".to_string()) + .to_string(), + parse_config(yaml).validate().unwrap_err().to_string() ); } diff --git a/src/proxy/builder.rs b/src/proxy/builder.rs index abec94cebd..60e41e8ab8 100644 --- a/src/proxy/builder.rs +++ b/src/proxy/builder.rs @@ -1,5 +1,6 @@ use crate::config::{Config, ValidationError}; use crate::extensions::{default_registry, CreateFilterError, FilterChain, FilterRegistry}; +use crate::proxy::server::metrics::Metrics as ProxyMetrics; use crate::proxy::{Metrics, Server}; use slog::{o, Drain, Logger}; use std::{ @@ -120,6 +121,8 @@ impl Builder { Server { log: self.log.new(o!("source" => "server::Server")), config: self.config, + proxy_metrics: ProxyMetrics::new(&self.metrics.registry.clone()) + .expect("metrics should be setup properly"), metrics: self.metrics, filter_chain: self.validation_status.0, } diff --git a/src/proxy/server/error.rs b/src/proxy/server/error.rs index 77aa071acb..ee3f09d62c 100644 --- a/src/proxy/server/error.rs +++ b/src/proxy/server/error.rs @@ -3,6 +3,7 @@ use std::fmt::{self, Display, Formatter}; #[derive(Debug)] pub enum Error { + Initialize(String), Session(SessionError), Bind(tokio::io::Error), } @@ -15,6 +16,7 @@ impl std::error::Error for Error {} impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { + Error::Initialize(reason) => write!(f, "failed to startup properly: {}", reason), Error::Session(inner) => write!(f, "session error: {}", inner), Error::Bind(inner) => write!(f, "failed to bind to port: {}", inner), } diff --git a/src/proxy/server/metrics.rs b/src/proxy/server/metrics.rs new file mode 100644 index 0000000000..18d23abf73 --- /dev/null +++ b/src/proxy/server/metrics.rs @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::metrics::{opts, CollectorExt}; +use prometheus::core::{AtomicI64, GenericCounter}; +use prometheus::{IntCounterVec, Registry, Result as MetricsResult}; + +#[derive(Clone)] +pub struct Metrics { + pub packets_dropped_no_endpoints: GenericCounter, +} + +impl Metrics { + pub fn new(registry: &Registry) -> MetricsResult { + let subsystem = "proxy"; + Ok(Self { + packets_dropped_no_endpoints: IntCounterVec::new( + opts( + "packets_dropped_total", + subsystem, + "Total number of packets dropped by the proxy", + ), + &["reason"], + )? + .register_if_not_exists(registry)? + .get_metric_with_label_values(&["NoConfiguredEndpoints"])?, + }) + } +} diff --git a/src/proxy/server/mod.rs b/src/proxy/server/mod.rs index 407daede14..5cf78a8922 100644 --- a/src/proxy/server/mod.rs +++ b/src/proxy/server/mod.rs @@ -26,14 +26,17 @@ use tokio::sync::{mpsc, watch}; use tokio::sync::{Mutex, RwLock}; use tokio::time::{delay_for, Duration, Instant}; -use crate::config::{Config, EndPoint, Endpoints, ProxyMode, Source, UpstreamEndpoints}; +use crate::config::{Config, EndPoint, Source}; use crate::extensions::{DownstreamContext, Filter, FilterChain}; use crate::proxy::sessions::{Packet, Session, SESSION_TIMEOUT_SECONDS}; use super::metrics::{start_metrics_server, Metrics}; +use crate::cluster::cluster_manager::{ClusterManager, SharedClusterManager}; use crate::proxy::server::error::{Error, RecvFromError}; +use metrics::Metrics as ProxyMetrics; pub mod error; +pub(super) mod metrics; type SessionMap = Arc>>>; @@ -46,6 +49,17 @@ pub struct Server { pub(super) config: Arc, pub(super) filter_chain: Arc, pub(super) metrics: Metrics, + pub(super) proxy_metrics: ProxyMetrics, +} + +struct RecvFromArgs { + log: Logger, + metrics: Metrics, + proxy_metrics: ProxyMetrics, + cluster_manager: SharedClusterManager, + chain: Arc, + sessions: SessionMap, + send_packets: mpsc::Sender, } impl Server { @@ -71,7 +85,7 @@ impl Server { self.run_receive_packet(send_socket, receive_packets); self.run_prune_sessions(&sessions); self.run_recv_from( - self.config.source.get_endpoints(), + self.create_cluster_manager(shutdown_rx.clone()).await?, self.filter_chain.clone(), receive_socket, &sessions, @@ -82,6 +96,47 @@ impl Server { Ok(()) } + async fn create_cluster_manager( + &self, + shutdown_rx: watch::Receiver<()>, + ) -> Result { + match &self.config.source { + Source::Static { + filters: _, + endpoints, + } => Ok(ClusterManager::fixed(endpoints.to_vec())), + Source::Dynamic { + filters: _, + management_servers, + } => { + let (cm, execution_result_rx) = ClusterManager::from_xds( + self.log.clone(), + management_servers.to_vec(), + self.config.proxy.id.clone(), + shutdown_rx, + ) + .await + .map_err(|err| Error::Initialize(format!("{}", err)))?; + + // Spawn a task to check for an error if the XDS client + // terminates and forward the error upstream. + let log = self.log.clone(); + tokio::spawn(async move { + if let Err(err) = execution_result_rx.await { + // TODO: For now only log the error but we would like to + // initiate a shut down instead once this happens. + error!( + log, + "ClusterManager XDS client terminated with an error: {}", err + ); + } + }); + + Ok(cm) + } + } + } + /// run_prune_sessions starts the timer for pruning sessions and runs prune_sessions every /// SESSION_TIMEOUT_SECONDS, via a tokio::spawn, i.e. it's non-blocking. /// Pruning will occur ~ every interval period. So the timeout expiration may sometimes @@ -102,7 +157,7 @@ impl Server { // Server::recv_from() to process new incoming packets. fn run_recv_from( &self, - endpoints: Endpoints, + cluster_manager: SharedClusterManager, chain: Arc, mut receive_socket: RecvHalf, sessions: &SessionMap, @@ -111,16 +166,20 @@ impl Server { let sessions = sessions.clone(); let log = self.log.clone(); let metrics = self.metrics.clone(); + let proxy_metrics = self.proxy_metrics.clone(); tokio::spawn(async move { loop { if let Err(err) = Server::recv_from( - &log, - &metrics, - endpoints.clone().into(), - chain.clone(), &mut receive_socket, - sessions.clone(), - send_packets.clone(), + RecvFromArgs { + log: log.clone(), + metrics: metrics.clone(), + proxy_metrics: proxy_metrics.clone(), + cluster_manager: cluster_manager.clone(), + chain: chain.clone(), + sessions: sessions.clone(), + send_packets: send_packets.clone(), + }, ) .await { @@ -133,32 +192,33 @@ impl Server { /// recv_from takes packets from the local socket and asynchronously /// processes them to send them out to endpoints. async fn recv_from( - log: &Logger, - metrics: &Metrics, - endpoints: UpstreamEndpoints, - chain: Arc, receive_socket: &mut RecvHalf, - sessions: SessionMap, - send_packets: mpsc::Sender, + args: RecvFromArgs, ) -> std::result::Result<(), RecvFromError> { let mut buf: Vec = vec![0; 65535]; let (size, recv_addr) = receive_socket .recv_from(&mut buf) .await .map_err(RecvFromError)?; - let log = log.clone(); - let metrics = metrics.clone(); tokio::spawn(async move { let packet = &buf[..size]; debug!( - log, + args.log, "Packet Received from: {}, {}", recv_addr, from_utf8(packet).unwrap() ); - let result = chain.on_downstream_receive(DownstreamContext::new( + let endpoints = match args.cluster_manager.read().get_all_endpoints() { + Some(endpoints) => endpoints, + None => { + args.proxy_metrics.packets_dropped_no_endpoints.inc(); + return; + } + }; + + let result = args.chain.on_downstream_receive(DownstreamContext::new( endpoints, recv_addr, packet.to_vec(), @@ -167,21 +227,21 @@ impl Server { if let Some(response) = result { for endpoint in response.endpoints.iter() { if let Err(err) = Server::ensure_session( - &log, - &metrics, - chain.clone(), - sessions.clone(), + &args.log, + &args.metrics, + args.chain.clone(), + args.sessions.clone(), recv_addr, endpoint, - send_packets.clone(), + args.send_packets.clone(), ) .await { - error!(log, "Error ensuring session exists"; "error" => %err); + error!(args.log, "Error ensuring session exists"; "error" => %err); continue; } - let map = sessions.read().await; + let map = args.sessions.read().await; let key = (recv_addr, endpoint.address); match map.get(&key) { Some(mtx) => { @@ -191,12 +251,12 @@ impl Server { session.increment_expiration().await; } Err(err) => { - error!(log, "Error sending packet from session"; "error" => %err) + error!(args.log, "Error sending packet from session"; "error" => %err) } }; } None => warn!( - log, + args.log, "Could not find session for key: ({}:{})", key.0.to_string(), key.1.to_string() @@ -239,20 +299,6 @@ impl Server { /// log_config outputs a log of what is configured fn log_config(&self) { info!(self.log, "Starting on port {}", self.config.proxy.port); - let addresses = match &self.config.source { - Source::Static { - filters: _, - endpoints, - } => endpoints.iter().map(|ep| ep.address), - }; - match &self.config.proxy.mode { - ProxyMode::Client => { - info!(self.log, "Client proxy configuration"; "address" => format!("{:?}", addresses)) - } - ProxyMode::Server => { - info!(self.log, "Server proxy configuration"; "endpoints" => addresses.len()) - } - }; } /// bind binds the local configured port @@ -500,15 +546,20 @@ mod tests { let endpoint_address = endpoint.addr; tokio::spawn(async move { Server::recv_from( - &t.log, - &Metrics::default(), - Endpoints::new(vec![EndPoint::new("".into(), endpoint_address, vec![])]) - .unwrap() - .into(), - chain, &mut recv, - sessions_clone, - send_packets.clone(), + RecvFromArgs { + log: t.log.clone(), + metrics: Metrics::default(), + proxy_metrics: ProxyMetrics::new(&Metrics::default().registry).unwrap(), + cluster_manager: ClusterManager::fixed(vec![EndPoint::new( + "".into(), + endpoint_address, + vec![], + )]), + chain, + sessions: sessions_clone, + send_packets: send_packets.clone(), + }, ) .await }); @@ -580,7 +631,7 @@ mod tests { let server = Builder::from(config).validate().unwrap().build(); server.run_recv_from( - Endpoints::new(vec![EndPoint::new("".into(), endpoint.addr, vec![])]).unwrap(), + ClusterManager::fixed(vec![EndPoint::new("".into(), endpoint.addr, vec![])]), server.filter_chain.clone(), recv, &sessions, diff --git a/src/xds/ads_client.rs b/src/xds/ads_client.rs index 08aec40b29..b814e909f5 100644 --- a/src/xds/ads_client.rs +++ b/src/xds/ads_client.rs @@ -27,6 +27,7 @@ use tonic::{ }; use crate::cluster::Cluster; +use crate::config::ManagementServer; use crate::xds::cluster::ClusterManager; use crate::xds::envoy::config::core::v3::Node; use crate::xds::envoy::service::discovery::v3::{ @@ -81,7 +82,7 @@ impl AdsClient { self, base_logger: Logger, node_id: String, - server_addresses: Vec, + management_servers: Vec, cluster_updates_tx: mpsc::Sender, mut shutdown_rx: watch::Receiver<()>, ) -> ExecutionResult { @@ -101,9 +102,9 @@ impl AdsClient { // Pick a server to talk to. let server_addr = { - let server_addr = server_addresses - .get(next_server_index % server_addresses.len()) - .cloned() + let server_addr = management_servers + .get(next_server_index % management_servers.len()) + .map(|server| server.address.clone()) // We have previously validated that a config provides at least one // server address so this default value shouldn't be necessary. .unwrap_or_else(|| "127.0.0.1:18000".into()); From 9bfd94a9e524aaac79ecbdf5adbcf7f8d7ee5b81 Mon Sep 17 00:00:00 2001 From: ifeanyi Date: Fri, 4 Dec 2020 11:11:57 +0100 Subject: [PATCH 2/4] Do not store endpoints per cluster --- src/cluster/cluster_manager.rs | 70 ++++++++++++++-------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/src/cluster/cluster_manager.rs b/src/cluster/cluster_manager.rs index ce5f61b3e0..eb3a2f0de8 100644 --- a/src/cluster/cluster_manager.rs +++ b/src/cluster/cluster_manager.rs @@ -33,12 +33,11 @@ use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult}; /// The max size of queue that provides updates from the XDS layer to the [`ClusterManager`]. const CLUSTER_UPDATE_QUEUE_SIZE: usize = 1000; -type Clusters = HashMap; pub type SharedClusterManager = Arc>; /// ClusterManager knows about all clusters and endpoints. pub struct ClusterManager { - clusters: Clusters, + endpoints: Option, } /// InitializeError is returned with an error message if the @@ -57,40 +56,26 @@ impl fmt::Display for InitializeError { impl std::error::Error for InitializeError {} impl ClusterManager { - fn new(clusters: Clusters) -> Self { - Self { clusters } + fn new(endpoints: Option) -> Self { + Self { endpoints } } - fn update(&mut self, clusters: Clusters) { - self.clusters = clusters; + fn update(&mut self, endpoints: Option) { + self.endpoints = endpoints; } /// Returns all endpoints known at the time of invocation. /// Returns `None` if there are no endpoints. pub fn get_all_endpoints(&self) -> Option { - // NOTE: We don't currently have support for consuming multiple clusters - // so here we assume that there is only a single cluster - Return all - // endpoints for _any_ cluster. - self.clusters - .iter() - .next() - .map(|(cluster_name, endpoints)| endpoints.clone().into()) + self.endpoints.clone().map(|ep| ep.into()) } /// Returns a ClusterManager backed by the fixed set of clusters provided in the config. pub fn fixed(endpoints: Vec) -> SharedClusterManager { - Arc::new(RwLock::new(Self::new( - Some("static-cluster".into()) - .into_iter() - .map(|cluster_name| { - ( - cluster_name, - Endpoints::new(endpoints.clone()) - .expect("endpoints list in config should be validated non-empty"), - ) - }) - .collect(), - ))) + Arc::new(RwLock::new(Self::new(Some( + Endpoints::new(endpoints) + .expect("endpoints list in config should be validated non-empty"), + )))) } /// Returns a ClusterManager backed by a set of XDS servers. @@ -125,7 +110,7 @@ impl ClusterManager { let cluster_update = Self::receive_initial_cluster_update(&mut cluster_updates_rx, &mut shutdown_rx).await?; - let cluster_manager = Arc::new(RwLock::new(Self::new(Self::create_clusters_from_update( + let cluster_manager = Arc::new(RwLock::new(Self::new(Self::create_endpoints_from_update( cluster_update, )))); @@ -141,11 +126,14 @@ impl ClusterManager { Ok((cluster_manager, execution_result_rx)) } - fn create_clusters_from_update(update: ClusterUpdate) -> Clusters { - update + fn create_endpoints_from_update(update: ClusterUpdate) -> Option { + // NOTE: We don't currently have support for consuming multiple clusters + // so here gather all endpoints into the same set, ignoring what cluster they + // belong to. + let endpoints = update .into_iter() - .filter_map(|(name, cluster)| { - let endpoints = cluster + .fold(vec![], |mut endpoints, (_name, cluster)| { + let cluster_endpoints = cluster .localities .into_iter() .map(|(_, endpoints)| { @@ -154,16 +142,16 @@ impl ClusterManager { .into_iter() .map(|ep| EndPoint::new("N/A".into(), ep.address, vec![])) }) - .flatten() - .collect::>(); - - // If we get an error it means that the list is empty, - // in which case we forget the cluster entirely in turn. - Endpoints::new(endpoints) - .ok() - .map(|endpoints| (name, endpoints)) - }) - .collect() + .flatten(); + endpoints.extend(cluster_endpoints); + + endpoints + }); + + match Endpoints::new(endpoints) { + Ok(endpoints) => Some(endpoints), + Err(EmptyListError) => None, + } } // Spawns a task that runs an ADS client. Cluster updates from the client @@ -230,7 +218,7 @@ impl ClusterManager { update = cluster_updates_rx.recv() => { match update { Some(update) => { - let update = Self::create_clusters_from_update(update); + let update = Self::create_endpoints_from_update(update); debug!(log, "Received a cluster update."); cluster_manager.write().update(update); } From e930ae225c8bbc53909e4cc8bd87582c505edb2b Mon Sep 17 00:00:00 2001 From: ifeanyi Date: Fri, 4 Dec 2020 11:24:22 +0100 Subject: [PATCH 3/4] update clone depth --- cloudbuild.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index ea8844c9ed..e146a1e0df 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -18,11 +18,11 @@ steps: args: - '-c' - | - git clone --depth 100 https://github.com/envoyproxy/data-plane-api.git proto/data-plane-api + git clone --depth 400 https://github.com/envoyproxy/data-plane-api.git proto/data-plane-api git -C proto/data-plane-api checkout b84d3bea45b59abc3fd21fba26140a379461fc67 - git clone --depth 100 https://github.com/cncf/udpa.git proto/udpa + git clone --depth 400 https://github.com/cncf/udpa.git proto/udpa git -C proto/udpa checkout efcf912fb35470672231c7b7bef620f3d17f655a - git clone --depth 100 https://github.com/envoyproxy/protoc-gen-validate.git proto/protoc-gen-validate + git clone --depth 400 https://github.com/envoyproxy/protoc-gen-validate.git proto/protoc-gen-validate git -C proto/protoc-gen-validate checkout e84d38a1a4c27d4662779c31a06528cdbc6b4b4f git clone --depth 400 https://github.com/googleapis/googleapis.git proto/googleapis git -C proto/googleapis checkout 2db5725bf898b544a0cf951e1694d3b0fce5eda3 From f0a46e2f62a5d6a8edd6cccb89152fe78579f6a6 Mon Sep 17 00:00:00 2001 From: ifeanyi Date: Mon, 7 Dec 2020 23:00:02 +0100 Subject: [PATCH 4/4] update proxy config --- docs/proxy-configuration.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/proxy-configuration.md b/docs/proxy-configuration.md index 143dc04d87..efbe0dbb48 100644 --- a/docs/proxy-configuration.md +++ b/docs/proxy-configuration.md @@ -53,6 +53,7 @@ properties: type: object description: | Static configuration of endpoints and filters. + NOTE: Exactly one of `static` or `dynamic` can be specified. properties: filter: '$ref': '#/definitions/filterchain' @@ -60,6 +61,32 @@ properties: '$ref': '#/definitions/endpoints' required: - endpoints + dynamic: + type: object + description: | + Dynamic configuration of endpoints and filters. + NOTE: Exactly one of `static` or `dynamic` can be specified. + properties: + filter: + '$ref': '#/definitions/filterchain' + management_servers: + type: array + description: | + A list of XDS management servers to fetch configuration from. + Multiple servers can be provided for redundancy for the proxy to + fall back to upon error. + items: + type: object + description: | + Configuration for a management server. + properties: + address: + type: string + description: | + Address of the management server. This must have the `http(s)` scheme prefix. + Example: `http://example.com` + required: + - management_servers required: - version