Skip to content

Commit

Permalink
Endpoint metadata support from static config
Browse files Browse the repository at this point in the history
Parses generic yaml key values into a json value and in order to
use as the endpoint's metadata.
Values under the `quilkin.dev` key is reserved for quilkin usage.
Extracts token into the endpoint object since we don't want to traverse
the json value on each lookup.
Removes name and connection ID from the static Endpoint config struct.

Work on #10
Fixes #152
  • Loading branch information
iffyio committed Dec 14, 2020
1 parent 03350f8 commit ff4ba45
Show file tree
Hide file tree
Showing 26 changed files with 579 additions and 400 deletions.
6 changes: 4 additions & 2 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::net::SocketAddr;
use std::{fmt, sync::Arc};
use tokio::sync::{mpsc, oneshot, watch};

use crate::cluster::Endpoint;
use crate::config::{EmptyListError, EndPoint, Endpoints, ManagementServer, UpstreamEndpoints};
use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult};

Expand Down Expand Up @@ -71,7 +72,8 @@ impl ClusterManager {
}

/// Returns a ClusterManager backed by the fixed set of clusters provided in the config.
pub fn fixed(endpoints: Vec<EndPoint>) -> SharedClusterManager {
pub fn fixed(endpoints: Vec<Endpoint>) -> SharedClusterManager {
// TODO: Return a result rather than unwrap.
Arc::new(RwLock::new(Self::new(Some(
Endpoints::new(endpoints)
.expect("endpoints list in config should be validated non-empty"),
Expand Down Expand Up @@ -140,7 +142,7 @@ impl ClusterManager {
endpoints
.endpoints
.into_iter()
.map(|ep| EndPoint::new("N/A".into(), ep.address, vec![]))
.map(|ep| Endpoint::from_address(ep.address))
})
.flatten();
endpoints.extend(cluster_endpoints);
Expand Down
30 changes: 29 additions & 1 deletion src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

use crate::config::{parse_endpoint_metadata_from_yaml, EndPoint};
use serde_json::value::Value;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;

#[cfg(not(doctest))]
Expand All @@ -31,6 +32,7 @@ pub(crate) mod cluster_manager {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Endpoint {
pub address: SocketAddr,
pub tokens: HashSet<Vec<u8>>,
pub metadata: Option<Value>,
}

Expand All @@ -52,3 +54,29 @@ pub struct Cluster {
}

pub type ClusterLocalities = HashMap<Option<Locality>, LocalityEndpoints>;

impl Endpoint {
pub fn new(address: SocketAddr, tokens: HashSet<Vec<u8>>, metadata: Option<Value>) -> Endpoint {
Endpoint {
address,
tokens,
metadata,
}
}

pub fn from_address(address: SocketAddr) -> Endpoint {
Endpoint::new(address, Default::default(), None)
}

/// Converts an endpoint config into an internal endpoint representation.
pub fn from_config(config: &EndPoint) -> Result<Endpoint, String> {
let (metadata, tokens) = if let Some(metadata) = config.metadata.clone() {
let (metadata, tokens) = parse_endpoint_metadata_from_yaml(metadata)?;
(Some(metadata), tokens)
} else {
(None, Default::default())
};

Ok(Endpoint::new(config.address, tokens, metadata))
}
}
37 changes: 20 additions & 17 deletions src/config/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* limitations under the License.
*/

use crate::config::EndPoint;
pub const ENDPOINT_METADATA_KEY_PREFIX: &str = "quilkin.dev";
pub const ENDPOINT_METADATA_TOKEN_KEY: &str = "endpoint.tokens";

// TODO Move endpoint.rs out of config/ into cluster/
use crate::cluster::Endpoint;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -28,7 +32,7 @@ pub struct IndexOutOfRangeError;

/// Endpoints represents the set of all known upstream endpoints.
#[derive(Clone, Debug, PartialEq)]
pub struct Endpoints(Arc<Vec<EndPoint>>);
pub struct Endpoints(Arc<Vec<Endpoint>>);

/// UpstreamEndpoints represents a set of endpoints.
/// This set is guaranteed to be non-empty - any operation that would
Expand All @@ -45,7 +49,7 @@ pub struct UpstreamEndpoints {

impl Endpoints {
/// Returns an [`Endpoints`] backed by the provided list of endpoints.
pub fn new(endpoints: Vec<EndPoint>) -> Result<Self, EmptyListError> {
pub fn new(endpoints: Vec<Endpoint>) -> Result<Self, EmptyListError> {
if endpoints.is_empty() {
Err(EmptyListError)
} else {
Expand Down Expand Up @@ -98,7 +102,7 @@ impl UpstreamEndpoints {
/// Returns an error if the predicate returns `false` for all endpoints.
pub fn retain<F>(&mut self, predicate: F) -> Result<(), AllEndpointsRemovedError>
where
F: Fn(&EndPoint) -> bool,
F: Fn(&Endpoint) -> bool,
{
match self.subset.as_mut() {
Some(subset) => {
Expand Down Expand Up @@ -151,7 +155,7 @@ pub struct UpstreamEndpointsIter<'a> {
}

impl<'a> Iterator for UpstreamEndpointsIter<'a> {
type Item = &'a EndPoint;
type Item = &'a Endpoint;

fn next(&mut self) -> Option<Self::Item> {
match &self.collection.subset {
Expand All @@ -172,14 +176,11 @@ impl<'a> Iterator for UpstreamEndpointsIter<'a> {
#[cfg(test)]
mod tests {
use super::Endpoints;
use crate::config::{EndPoint, UpstreamEndpoints};

fn ep(id: usize) -> EndPoint {
EndPoint::new(
format!("ep-{}", id),
format!("127.0.0.{}:8080", id).parse().unwrap(),
vec![],
)
use crate::cluster::Endpoint;
use crate::config::UpstreamEndpoints;

fn ep(id: usize) -> Endpoint {
Endpoint::from_address(format!("127.0.0.{}:8080", id).parse().unwrap())
}

#[test]
Expand Down Expand Up @@ -215,23 +216,25 @@ mod tests {

let mut up: UpstreamEndpoints = Endpoints::new(initial_endpoints.clone()).unwrap().into();

up.retain(|ep| ep.name != "ep-2").unwrap();
up.retain(|ep| ep.address.to_string().as_str() != "127.0.0.2:8080")
.unwrap();
assert_eq!(up.size(), 3);
assert_eq!(
vec![ep(1), ep(3), ep(4)],
up.iter().cloned().collect::<Vec<_>>()
);

up.retain(|ep| ep.name != "ep-3").unwrap();
up.retain(|ep| ep.address.to_string().as_str() != "127.0.0.3:8080")
.unwrap();
assert_eq!(up.size(), 2);
assert_eq!(vec![ep(1), ep(4)], up.iter().cloned().collect::<Vec<_>>());

// test an empty result on retain
let result = up.retain(|ep| ep.name == "never");
let result = up.retain(|_| false);
assert!(result.is_err());

let mut up: UpstreamEndpoints = Endpoints::new(initial_endpoints).unwrap().into();
let result = up.retain(|ep| ep.name == "never");
let result = up.retain(|_| false);
assert!(result.is_err());
}

Expand Down
202 changes: 202 additions & 0 deletions src/config/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::config::{ENDPOINT_METADATA_KEY_PREFIX, ENDPOINT_METADATA_TOKEN_KEY};
use serde_json::map::Map as JsonMap;
use serde_json::value::Value as JSONValue;
use serde_json::Number as JSONNumber;
use serde_yaml::Value as YamlValue;
use std::collections::HashSet;

// Returns an empty map if no tokens exist.
pub fn extract_endpoint_tokens(
metadata: &mut JsonMap<String, JSONValue>,
) -> Result<HashSet<Vec<u8>>, String> {
let tokens = metadata.remove(ENDPOINT_METADATA_KEY_PREFIX)
.map(|raw_value| {
match raw_value {
JSONValue::Object(mut object) => {
match object.remove(ENDPOINT_METADATA_TOKEN_KEY) {
Some(JSONValue::Array(raw_tokens)) => {
raw_tokens.into_iter().fold(Ok(HashSet::new()), |acc, val| {
let mut tokens = acc?;

let token = match val {
JSONValue::String(token) =>
base64::decode(token)
.map_err(|err| format!(
"key {}.{}: failed to decode token as a base64 string:{}",
ENDPOINT_METADATA_KEY_PREFIX,
ENDPOINT_METADATA_TOKEN_KEY,
err
)),
_ => Err(format!(
"invalid value in token list for key `{}`: value must a base64 string",
ENDPOINT_METADATA_TOKEN_KEY
))
};

tokens.insert(token?);
Ok(tokens)
})
},
Some(_) => Err(format!(
"invalid data type for key `{}.{}`: value must be a list of base64 strings",
ENDPOINT_METADATA_KEY_PREFIX,
ENDPOINT_METADATA_TOKEN_KEY
)),
None => Ok(Default::default()),
}
}
_ => Err(format!("invalid data type for key `{}`: value must be an object", ENDPOINT_METADATA_KEY_PREFIX))
}
})
.transpose()?;

Ok(tokens.unwrap_or_default())
}

pub fn parse_endpoint_metadata_from_yaml(
yaml: YamlValue,
) -> Result<(JSONValue, HashSet<Vec<u8>>), String> {
let mapping = if let YamlValue::Mapping(mapping) = yaml {
mapping
} else {
return Err("invalid endpoint metadata: value must be a yaml object".into());
};

let mut map = JsonMap::new();
for (yaml_key, yaml_value) in mapping {
let key = parse_yaml_key(yaml_key)?;
let value = yaml_to_json_value(key.as_str(), yaml_value)?;
map.insert(key, value);
}

let tokens = extract_endpoint_tokens(&mut map)?;

Ok((JSONValue::Object(map), tokens))
}

fn yaml_to_json_value(key: &str, yaml: YamlValue) -> Result<JSONValue, String> {
let json_value = match yaml {
YamlValue::Null => JSONValue::Null,
YamlValue::Bool(v) => JSONValue::Bool(v),
YamlValue::Number(v) => match v.as_f64() {
Some(v) => JSONValue::Number(
JSONNumber::from_f64(v)
.ok_or_else(|| format!("invalid f64 `{:?}` provided for key `{}`", v, key))?,
),
None => return Err(format!("failed to parse key `{}` as f64 number", key)),
},
YamlValue::String(v) => JSONValue::String(v),
YamlValue::Sequence(v) => {
let mut array = vec![];
for yaml_value in v {
array.push(yaml_to_json_value(key, yaml_value)?);
}
JSONValue::Array(array)
}
YamlValue::Mapping(v) => {
let mut map = JsonMap::new();

for (yaml_key, yaml_value) in v {
let nested_key = parse_yaml_key(yaml_key)?;
let value =
yaml_to_json_value(format!("{}.{}", key, nested_key).as_str(), yaml_value)?;
map.insert(nested_key, value);
}

JSONValue::Object(map)
}
};

Ok(json_value)
}

fn parse_yaml_key(yaml: YamlValue) -> Result<String, String> {
match yaml {
YamlValue::String(v) => Ok(v),
v => Err(format!(
"invalid key `{:?}`: only string keys are allowed",
v
)),
}
}
#[cfg(test)]
mod tests {
use crate::config::metadata::{parse_endpoint_metadata_from_yaml, yaml_to_json_value};
use std::collections::HashSet;

#[test]
fn yaml_data_types() {
let yaml = "
one: two
three:
four:
- five
- 6
seven:
eight: true
";
let yaml_value = serde_yaml::from_str(yaml).unwrap();
let expected_json = serde_json::json!({
"one": "two",
"three": {
"four": ["five", 6.0],
"seven": {
"eight": true
}
}
});

assert_eq!(yaml_to_json_value("k", yaml_value).unwrap(), expected_json);
}

#[test]
fn yaml_parse_endpoint_metadata() {
let yaml = "
user:
key1: value1
quilkin.dev:
endpoint.tokens:
- MXg3aWp5Ng== #1x7ijy6
- OGdqM3YyaQ== #8gj3v2i
";
let yaml_value = serde_yaml::from_str(yaml).unwrap();
let expected_user_metadata = serde_json::json!({
"user": {
"key1": "value1"
}
});

let (user_metadata, tokens) = parse_endpoint_metadata_from_yaml(yaml_value).unwrap();
assert_eq!(user_metadata, expected_user_metadata);
assert_eq!(
tokens,
vec!["1x7ijy6".into(), "8gj3v2i".into()]
.into_iter()
.collect::<HashSet<_>>()
);
}

#[test]
fn yaml_parse_invalid_endpoint_metadata() {
let not_a_list = "
quilkin.dev:
endpoint.tokens: OGdqM3YyaQ==
";
let not_a_string_value = "
quilkin.dev:
endpoint.tokens:
- OGdqM3YyaQ== #8gj3v2i
- 300
";
let not_a_base64_string = "
quilkin.dev:
endpoint.tokens:
- OGdqM3YyaQ== #8gj3v2i
- 1x7ijy6
";
for yaml in vec![not_a_list, not_a_string_value, not_a_base64_string] {
let yaml_value = serde_yaml::from_str(yaml).unwrap();
assert!(parse_endpoint_metadata_from_yaml(yaml_value).is_err());
}
}
}
Loading

0 comments on commit ff4ba45

Please sign in to comment.