Skip to content

Commit

Permalink
Include metadata for XDS endpoints (#154)
Browse files Browse the repository at this point in the history
This includes the endpoint metadata provided by the XDS API as a JSON
object internally.

Work on #10

Co-authored-by: Mark Mandel <[email protected]>
  • Loading branch information
iffyio and markmandel authored Dec 11, 2020
1 parent 2266ff4 commit 03350f8
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ prost = "0.6"
prost-types = "0.6.1"
serde = { version = "1.0.104", features = ["derive"] }
serde_yaml = "0.8.11"
serde_json = "1.0.60"
slog = "2.5.2"
slog-async = "2.4.0"
slog-json = "2.3.0"
Expand Down
2 changes: 2 additions & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use serde_json::value::Value;
use std::collections::HashMap;
use std::net::SocketAddr;

Expand All @@ -30,6 +31,7 @@ pub(crate) mod cluster_manager {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Endpoint {
pub address: SocketAddr,
pub metadata: Option<Value>,
}

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
Expand Down
106 changes: 96 additions & 10 deletions src/xds/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::xds::envoy::config::core::v3::{address, socket_address};
use crate::xds::envoy::config::endpoint::v3::{lb_endpoint, ClusterLoadAssignment};
use crate::xds::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
use crate::xds::google::rpc::Status;
use crate::xds::metadata;
use crate::xds::{CLUSTER_TYPE, ENDPOINT_TYPE};

use crate::xds::error::Error;
Expand Down Expand Up @@ -250,11 +251,18 @@ impl ClusterManager {
sub_zone: locality.sub_zone,
});

let mut endpoint_addresses = vec![];
for host_identifier in lb_locality
.lb_endpoints
.into_iter()
.filter_map(|lb_endpoint| lb_endpoint.host_identifier)
// Extract components of the endpoint that we care about.
let mut processed_endpoints = vec![];
for (host_identifier, metadata) in
lb_locality
.lb_endpoints
.into_iter()
.filter_map(|lb_endpoint| {
let metadata = lb_endpoint.metadata;
lb_endpoint
.host_identifier
.map(|host_identifier| (host_identifier, metadata))
})
{
let endpoint = match host_identifier {
lb_endpoint::HostIdentifier::Endpoint(endpoint) => Ok(endpoint),
Expand All @@ -269,6 +277,7 @@ impl ClusterManager {
}
}?;

// Extract the endpoint's address.
let address = endpoint
.address
.and_then(|address| address.address)
Expand Down Expand Up @@ -297,12 +306,20 @@ impl ClusterManager {
Err(Error::new("received `Endpoint` with no `address`".into()))
})?;

endpoint_addresses.push(address);
// Extract any metadata associated with the endpoint.
let metadata = if let Some(metadata) = metadata {
Some(metadata::to_json(metadata).map_err(Error::new)?)
} else {
None
};

processed_endpoints.push((address, metadata));
}

let mut endpoints = vec![];
for (addr, port) in endpoint_addresses.into_iter() {
for ((addr, port), metadata) in processed_endpoints.into_iter() {
endpoints.push(Endpoint {
metadata,
// We only support IP addresses so anything else is an error.
address: addr
.parse::<std::net::IpAddr>()
Expand Down Expand Up @@ -403,7 +420,7 @@ mod tests {
use crate::test_utils::logger;
use crate::xds::envoy::config::cluster::v3::{cluster::ClusterDiscoveryType, Cluster};
use crate::xds::envoy::config::core::v3::{
address, socket_address::PortSpecifier, Address, SocketAddress,
address, socket_address::PortSpecifier, Address, Metadata, SocketAddress,
};
use crate::xds::envoy::config::endpoint::v3::{
lb_endpoint::HostIdentifier, ClusterLoadAssignment, Endpoint, LbEndpoint,
Expand All @@ -412,6 +429,9 @@ mod tests {
use crate::xds::envoy::service::discovery::v3::{DiscoveryRequest, DiscoveryResponse};
use crate::xds::{CLUSTER_TYPE, ENDPOINT_TYPE};
use prost::Message;
use prost_types::value::Kind;
use prost_types::Struct as ProstStruct;
use prost_types::Value as ProstValue;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -531,7 +551,8 @@ mod tests {
.unwrap()
.endpoints,
vec![ProxyEndpoint {
address: expected_socket_addr
address: expected_socket_addr,
metadata: None,
}]
);
assert_eq!(
Expand All @@ -543,7 +564,8 @@ mod tests {
.unwrap()
.endpoints,
vec![ProxyEndpoint {
address: "127.0.0.1:2020".parse().unwrap()
address: "127.0.0.1:2020".parse().unwrap(),
metadata: None,
}]
);
}
Expand Down Expand Up @@ -785,6 +807,70 @@ mod tests {
assert_cluster_has_lone_static_address(&cluster_b, "127.0.0.9:4040");
}

#[tokio::test]
async fn endpoint_metadata() {
// Test that we include associated endpoint metadata in cluster update.

let (cluster_updates_tx, mut cluster_updates_rx) = mpsc::channel::<ClusterState>(100);
let (discovery_req_tx, _) = mpsc::channel::<DiscoveryRequest>(100);
let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx);

cm.on_cluster_response(cluster_discovery_response_with_update(
"1",
"2",
vec!["a".into()],
|mut cluster| {
if let Some(assignment) = cluster.load_assignment.as_mut() {
// Set metadata mapping a key to an empty array.
assignment.endpoints[0].lb_endpoints[0].metadata = Some(Metadata {
filter_metadata: vec![(
format!("key-{}", cluster.name),
ProstStruct {
fields: vec![(
"one".into(),
ProstValue {
kind: Some(Kind::StringValue("two".into())),
},
)]
.into_iter()
.collect(),
},
)]
.into_iter()
.collect(),
})
};

cluster
},
))
.await;

// Read the cluster update.
let cluster_state = cluster_updates_rx.recv().await.unwrap();
assert_eq!(cluster_state.len(), 1);
let (cluster_name, cluster) = cluster_state.iter().next().unwrap();

assert_eq!(cluster.localities.len(), 1);
let (_, locality) = cluster.localities.iter().next().unwrap();

// Validate the metadata we set for the endpoint.
assert_eq!(locality.endpoints.len(), 1);
let endpoint = locality.endpoints.get(0).unwrap();
let metadata = endpoint.metadata.as_ref().unwrap();

let object = metadata.as_object().unwrap();
assert_eq!(object.len(), 1);

let value = object.get(&format!("key-{}", cluster_name)).unwrap();
assert_eq!(
value,
&serde_json::json!({
"one": "two"
})
);
}

// Test Helpers
fn create_endpoint_resource(cluster_name: &str) -> ClusterLoadAssignment {
ClusterLoadAssignment {
Expand Down
Loading

0 comments on commit 03350f8

Please sign in to comment.