Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endpoint metadata support from static config #160

Merged
merged 3 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::net::SocketAddr;
use std::{fmt, sync::Arc};
use tokio::sync::{mpsc, oneshot, watch};

use crate::cluster::Endpoint;
use crate::config::{EmptyListError, EndPoint, Endpoints, ManagementServer, UpstreamEndpoints};
use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult};

Expand Down Expand Up @@ -71,7 +72,8 @@ impl ClusterManager {
}

/// Returns a ClusterManager backed by the fixed set of clusters provided in the config.
pub fn fixed(endpoints: Vec<EndPoint>) -> SharedClusterManager {
pub fn fixed(endpoints: Vec<Endpoint>) -> SharedClusterManager {
// TODO: Return a result rather than unwrap.
Arc::new(RwLock::new(Self::new(Some(
Endpoints::new(endpoints)
.expect("endpoints list in config should be validated non-empty"),
Expand Down Expand Up @@ -140,7 +142,7 @@ impl ClusterManager {
endpoints
.endpoints
.into_iter()
.map(|ep| EndPoint::new("N/A".into(), ep.address, vec![]))
.map(|ep| Endpoint::from_address(ep.address))
})
.flatten();
endpoints.extend(cluster_endpoints);
Expand Down
30 changes: 29 additions & 1 deletion src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

use crate::config::{parse_endpoint_metadata_from_yaml, EndPoint};
use serde_json::value::Value;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;

#[cfg(not(doctest))]
Expand All @@ -31,6 +32,7 @@ pub(crate) mod cluster_manager {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Endpoint {
pub address: SocketAddr,
pub tokens: HashSet<Vec<u8>>,
Copy link
Member

@markmandel markmandel Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be capturing tokens at all here, since this seems like it's all handled by the Filters?

If you can define metadata on an Endpoint in the static section - is that not enough? Especially since the key for the token lookup or storage is configurable in TokenRouter? (Edit: got this confused with dynamic metadata)

Should the Filter define which metadata value has all the tokens in it. so it is configurable? Rather than have it be hard-coded? (with a standard default as well)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason was to pre-process it so that tokens are simple and efficient to query. If we treat it as other metadata then the token router or any other filter that needs the info would need to traverse the json structure and scan a list each time.
Re pulling it out of the metadata, we can consider these as special cases since its primarily quilkin that needs them, compared to other metadata that the control plane might pass to a user's custom filter for example.

Should the Filter define which metadata value has all the tokens in it. so it is configurable? Rather than have it be hard-coded? (with a standard default as well)

I'm not following the advantage of having it configurable? if e.g a user filter needs the tokens accessible on a particular field they can still put them under the desired key in the metadata without affecting any quilkin use case but I can't see why one would want it to be configurable if its already present in the endpoint itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason was to pre-process it so that tokens are simple and efficient to query.

That makes sense - it's a performance concern. We will need to document how we use xDS really well, and include this 👍

I'm not following the advantage of having it configurable?

That's a fair point. I am wondering if at some point someone might want it to be configurable at the config.yaml file level - basically as a system wide configuration parameter -- but we could also cross that bridge when we come to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a mental note for me "tokens" as a name totally makes sense, since we have a TokenRouter 👍

pub metadata: Option<Value>,
}

Expand All @@ -52,3 +54,29 @@ pub struct Cluster {
}

pub type ClusterLocalities = HashMap<Option<Locality>, LocalityEndpoints>;

impl Endpoint {
pub fn new(address: SocketAddr, tokens: HashSet<Vec<u8>>, metadata: Option<Value>) -> Endpoint {
Endpoint {
address,
tokens,
metadata,
}
}

pub fn from_address(address: SocketAddr) -> Endpoint {
Endpoint::new(address, Default::default(), None)
}

/// Converts an endpoint config into an internal endpoint representation.
pub fn from_config(config: &EndPoint) -> Result<Endpoint, String> {
let (metadata, tokens) = if let Some(metadata) = config.metadata.clone() {
let (metadata, tokens) = parse_endpoint_metadata_from_yaml(metadata)?;
(Some(metadata), tokens)
} else {
(None, Default::default())
};

Ok(Endpoint::new(config.address, tokens, metadata))
}
}
34 changes: 17 additions & 17 deletions src/config/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
* limitations under the License.
*/

use crate::config::EndPoint;
// TODO Move endpoint.rs out of config/ into cluster/
use crate::cluster::Endpoint;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -28,7 +29,7 @@ pub struct IndexOutOfRangeError;

/// Endpoints represents the set of all known upstream endpoints.
#[derive(Clone, Debug, PartialEq)]
pub struct Endpoints(Arc<Vec<EndPoint>>);
pub struct Endpoints(Arc<Vec<Endpoint>>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That a nice naming.


/// UpstreamEndpoints represents a set of endpoints.
/// This set is guaranteed to be non-empty - any operation that would
Expand All @@ -45,7 +46,7 @@ pub struct UpstreamEndpoints {

impl Endpoints {
/// Returns an [`Endpoints`] backed by the provided list of endpoints.
pub fn new(endpoints: Vec<EndPoint>) -> Result<Self, EmptyListError> {
pub fn new(endpoints: Vec<Endpoint>) -> Result<Self, EmptyListError> {
if endpoints.is_empty() {
Err(EmptyListError)
} else {
Expand Down Expand Up @@ -98,7 +99,7 @@ impl UpstreamEndpoints {
/// Returns an error if the predicate returns `false` for all endpoints.
pub fn retain<F>(&mut self, predicate: F) -> Result<(), AllEndpointsRemovedError>
where
F: Fn(&EndPoint) -> bool,
F: Fn(&Endpoint) -> bool,
{
match self.subset.as_mut() {
Some(subset) => {
Expand Down Expand Up @@ -151,7 +152,7 @@ pub struct UpstreamEndpointsIter<'a> {
}

impl<'a> Iterator for UpstreamEndpointsIter<'a> {
type Item = &'a EndPoint;
type Item = &'a Endpoint;

fn next(&mut self) -> Option<Self::Item> {
match &self.collection.subset {
Expand All @@ -172,14 +173,11 @@ impl<'a> Iterator for UpstreamEndpointsIter<'a> {
#[cfg(test)]
mod tests {
use super::Endpoints;
use crate::config::{EndPoint, UpstreamEndpoints};

fn ep(id: usize) -> EndPoint {
EndPoint::new(
format!("ep-{}", id),
format!("127.0.0.{}:8080", id).parse().unwrap(),
vec![],
)
use crate::cluster::Endpoint;
use crate::config::UpstreamEndpoints;

fn ep(id: usize) -> Endpoint {
Endpoint::from_address(format!("127.0.0.{}:8080", id).parse().unwrap())
}

#[test]
Expand Down Expand Up @@ -215,23 +213,25 @@ mod tests {

let mut up: UpstreamEndpoints = Endpoints::new(initial_endpoints.clone()).unwrap().into();

up.retain(|ep| ep.name != "ep-2").unwrap();
up.retain(|ep| ep.address.to_string().as_str() != "127.0.0.2:8080")
.unwrap();
assert_eq!(up.size(), 3);
assert_eq!(
vec![ep(1), ep(3), ep(4)],
up.iter().cloned().collect::<Vec<_>>()
);

up.retain(|ep| ep.name != "ep-3").unwrap();
up.retain(|ep| ep.address.to_string().as_str() != "127.0.0.3:8080")
.unwrap();
assert_eq!(up.size(), 2);
assert_eq!(vec![ep(1), ep(4)], up.iter().cloned().collect::<Vec<_>>());

// test an empty result on retain
let result = up.retain(|ep| ep.name == "never");
let result = up.retain(|_| false);
assert!(result.is_err());

let mut up: UpstreamEndpoints = Endpoints::new(initial_endpoints).unwrap().into();
let result = up.retain(|ep| ep.name == "never");
let result = up.retain(|_| false);
assert!(result.is_err());
}

Expand Down
209 changes: 209 additions & 0 deletions src/config/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use serde_json::map::Map as JsonMap;
use serde_json::value::Value as JSONValue;
use serde_json::Number as JSONNumber;
use serde_yaml::Value as YamlValue;
use std::collections::HashSet;

/// METADATA_KEY is the key under which quilkin specific configuration is
/// placed in an endpoint metadata.
pub(crate) const METADATA_KEY: &str = "quilkin.dev";

/// ENDPOINT_METADATA_KEY_TOKENS is the key under which tokens for an endpoint
/// exist in an endpoint metadata.
pub const ENDPOINT_METADATA_TOKENS: &str = "tokens";

// Returns an empty map if no tokens exist.
pub fn extract_endpoint_tokens(
markmandel marked this conversation as resolved.
Show resolved Hide resolved
metadata: &mut JsonMap<String, JSONValue>,
) -> Result<HashSet<Vec<u8>>, String> {
let tokens = metadata.remove(METADATA_KEY)
.map(|raw_value| {
match raw_value {
JSONValue::Object(mut object) => {
match object.remove(ENDPOINT_METADATA_TOKENS) {
Some(JSONValue::Array(raw_tokens)) => {
raw_tokens.into_iter().fold(Ok(HashSet::new()), |acc, val| {
let mut tokens = acc?;

let token = match val {
JSONValue::String(token) =>
base64::decode(token)
.map_err(|err| format!(
"key {}.{}: failed to decode token as a base64 string:{}",
METADATA_KEY,
ENDPOINT_METADATA_TOKENS,
err
)),
_ => Err(format!(
"invalid value in token list for key `{}`: value must a base64 string",
ENDPOINT_METADATA_TOKENS
))
};

tokens.insert(token?);
Ok(tokens)
})
},
Some(_) => Err(format!(
"invalid data type for key `{}.{}`: value must be a list of base64 strings",
METADATA_KEY,
ENDPOINT_METADATA_TOKENS
)),
None => Ok(Default::default()),
}
}
_ => Err(format!("invalid data type for key `{}`: value must be an object", METADATA_KEY))
}
})
.transpose()?;

Ok(tokens.unwrap_or_default())
}

pub fn parse_endpoint_metadata_from_yaml(
yaml: YamlValue,
) -> Result<(JSONValue, HashSet<Vec<u8>>), String> {
let mapping = if let YamlValue::Mapping(mapping) = yaml {
mapping
} else {
return Err("invalid endpoint metadata: value must be a yaml object".into());
};

let mut map = JsonMap::new();
for (yaml_key, yaml_value) in mapping {
let key = parse_yaml_key(yaml_key)?;
let value = yaml_to_json_value(key.as_str(), yaml_value)?;
map.insert(key, value);
}

let tokens = extract_endpoint_tokens(&mut map)?;

Ok((JSONValue::Object(map), tokens))
}

fn yaml_to_json_value(key: &str, yaml: YamlValue) -> Result<JSONValue, String> {
let json_value = match yaml {
YamlValue::Null => JSONValue::Null,
YamlValue::Bool(v) => JSONValue::Bool(v),
YamlValue::Number(v) => match v.as_f64() {
Some(v) => JSONValue::Number(
JSONNumber::from_f64(v)
.ok_or_else(|| format!("invalid f64 `{:?}` provided for key `{}`", v, key))?,
),
None => return Err(format!("failed to parse key `{}` as f64 number", key)),
},
YamlValue::String(v) => JSONValue::String(v),
YamlValue::Sequence(v) => {
let mut array = vec![];
for yaml_value in v {
array.push(yaml_to_json_value(key, yaml_value)?);
}
JSONValue::Array(array)
}
YamlValue::Mapping(v) => {
let mut map = JsonMap::new();

for (yaml_key, yaml_value) in v {
let nested_key = parse_yaml_key(yaml_key)?;
let value =
yaml_to_json_value(format!("{}.{}", key, nested_key).as_str(), yaml_value)?;
map.insert(nested_key, value);
}

JSONValue::Object(map)
}
};

Ok(json_value)
}

fn parse_yaml_key(yaml: YamlValue) -> Result<String, String> {
match yaml {
YamlValue::String(v) => Ok(v),
v => Err(format!(
"invalid key `{:?}`: only string keys are allowed",
v
)),
}
}
#[cfg(test)]
mod tests {
use crate::config::metadata::{parse_endpoint_metadata_from_yaml, yaml_to_json_value};
use std::collections::HashSet;

#[test]
fn yaml_data_types() {
let yaml = "
one: two
three:
four:
- five
- 6
seven:
eight: true
";
let yaml_value = serde_yaml::from_str(yaml).unwrap();
let expected_json = serde_json::json!({
"one": "two",
"three": {
"four": ["five", 6.0],
"seven": {
"eight": true
}
}
});

assert_eq!(yaml_to_json_value("k", yaml_value).unwrap(), expected_json);
}

#[test]
fn yaml_parse_endpoint_metadata() {
let yaml = "
user:
key1: value1
quilkin.dev:
tokens:
- MXg3aWp5Ng== #1x7ijy6
- OGdqM3YyaQ== #8gj3v2i
";
let yaml_value = serde_yaml::from_str(yaml).unwrap();
let expected_user_metadata = serde_json::json!({
"user": {
"key1": "value1"
}
});

let (user_metadata, tokens) = parse_endpoint_metadata_from_yaml(yaml_value).unwrap();
assert_eq!(user_metadata, expected_user_metadata);
assert_eq!(
tokens,
vec!["1x7ijy6".into(), "8gj3v2i".into()]
.into_iter()
.collect::<HashSet<_>>()
);
}

#[test]
fn yaml_parse_invalid_endpoint_metadata() {
let not_a_list = "
quilkin.dev:
tokens: OGdqM3YyaQ==
";
let not_a_string_value = "
quilkin.dev:
tokens:
- OGdqM3YyaQ== #8gj3v2i
- 300
";
let not_a_base64_string = "
quilkin.dev:
tokens:
- OGdqM3YyaQ== #8gj3v2i
- 1x7ijy6
";
for yaml in &[not_a_list, not_a_string_value, not_a_base64_string] {
let yaml_value = serde_yaml::from_str(yaml).unwrap();
assert!(parse_endpoint_metadata_from_yaml(yaml_value).is_err());
}
}
}
Loading