Skip to content

Commit

Permalink
Add per-session filter chain
Browse files Browse the repository at this point in the history
* Add a filter chain to each session instance
* Add FilterFactory method to validate a filter's config so
  that we can validate config on startup rather than when
  later at runtime when the first session is created.
* Use Vec::truncate instead of slice::to_vec to avoid unneeded
  packet cloning.

Fixes #75
  • Loading branch information
iffyio committed Aug 16, 2020
1 parent b9e129a commit 46fc8d6
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 167 deletions.
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@ use std::net::SocketAddr;
use base64_serde::base64_serde_type;
use serde::export::Formatter;
use serde::{Deserialize, Serialize};
use std::error::Error;

base64_serde_type!(Base64Standard, base64::STANDARD);

/// Validation failure for a Config
#[derive(Debug, PartialEq)]
pub enum ValidationError {
NotUnique(String),
FieldInvalid { field: String, reason: String },
}

impl Error for ValidationError {}

impl fmt::Display for ValidationError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ValidationError::NotUnique(field) => write!(f, "field {} is not unique", field),
ValidationError::FieldInvalid { field, reason } => {
write!(f, "field {} is invalid: {}", field, reason)
}
}
}
}
Expand Down
29 changes: 10 additions & 19 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use std::io::{Error, ErrorKind, Result};
use std::net::SocketAddr;
use std::sync::Arc;

use crate::config::{Config, EndPoint};
use crate::extensions::{Filter, FilterRegistry};
Expand All @@ -38,10 +37,7 @@ impl FilterChain {

// from_config returns a FilterChain from a given config. Will return a ErrorKind::InvalidInput
// if there is an issue with the passed in Configuration.
pub fn from_config(
config: Arc<Config>,
filter_registry: &FilterRegistry,
) -> Result<FilterChain> {
pub fn from_config(config: &Config, filter_registry: &FilterRegistry) -> Result<FilterChain> {
let mut filters = Vec::<Box<dyn Filter>>::new();
for filter_config in &config.filters {
match filter_registry.get(&filter_config.name, &filter_config.config) {
Expand Down Expand Up @@ -107,7 +103,7 @@ mod tests {
use crate::config::{ConnectionConfig, Local};
use crate::extensions::filters::DebugFilterFactory;
use crate::extensions::{default_registry, FilterFactory};
use crate::test_utils::{logger, noop_endpoint, TestFilter};
use crate::test_utils::{logger, TestFilter};

use super::*;

Expand All @@ -117,7 +113,7 @@ mod tests {
let provider = DebugFilterFactory::new(&log);

// everything is fine
let config = Arc::new(Config {
let config = Config {
local: Local { port: 0 },
filters: vec![config::Filter {
name: provider.name(),
Expand All @@ -128,14 +124,14 @@ mod tests {
connection_id: "".into(),
lb_policy: None,
},
});
};

let registry = default_registry(&log);
let chain = FilterChain::from_config(config, &registry).unwrap();
let chain = FilterChain::from_config(&config, &registry).unwrap();
assert_eq!(1, chain.filters.len());

// uh oh, something went wrong
let config = Arc::new(Config {
let config = Config {
local: Local { port: 0 },
filters: vec![config::Filter {
name: "this is so wrong".to_string(),
Expand All @@ -146,8 +142,8 @@ mod tests {
connection_id: "".into(),
lb_policy: None,
},
});
let result = FilterChain::from_config(config, &registry);
};
let result = FilterChain::from_config(&config, &registry);
assert!(result.is_err());
}

Expand Down Expand Up @@ -180,9 +176,7 @@ mod tests {
)
.unwrap();

let mut expected = endpoints_fixture.clone();
expected.push(noop_endpoint());
assert_eq!(expected, eps);
assert_eq!(endpoints_fixture.clone(), eps);
assert_eq!(
"hello:odr:127.0.0.1:70",
from_utf8(content.as_slice()).unwrap()
Expand Down Expand Up @@ -216,10 +210,7 @@ mod tests {
)
.unwrap();

let mut expected = endpoints_fixture.clone();
expected.push(noop_endpoint());
expected.push(noop_endpoint());
assert_eq!(expected, eps);
assert_eq!(endpoints_fixture.clone(), eps);
assert_eq!(
"hello:odr:127.0.0.1:70:odr:127.0.0.1:70",
from_utf8(content.as_slice()).unwrap()
Expand Down
51 changes: 43 additions & 8 deletions src/extensions/filter_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/

use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt;
use std::net::SocketAddr;

use serde::export::Formatter;

use crate::config::EndPoint;
use crate::config::{EndPoint, Filter as FilterConfig, ValidationError};

/// Filter is a trait for routing and manipulating packets.
pub trait Filter: Send + Sync {
Expand Down Expand Up @@ -53,16 +52,23 @@ pub trait Filter: Send + Sync {
/// Error is an error when attempting to create a Filter from_config() from a FilterFactory
pub enum Error {
NotFound(String),
FieldInvalid { field: String, reason: String },
InvalidConfig(ValidationError),
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Error::InvalidConfig(err) => Some(err),
_ => None,
}
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::NotFound(key) => write!(f, "filter {} is not found", key),
Error::FieldInvalid { field, reason } => {
write!(f, "field {} is invalid: {}", field, reason)
}
Error::InvalidConfig(err) => write!(f, "filter configuration is invalid: {}", err),
}
}
}
Expand All @@ -71,6 +77,14 @@ impl fmt::Display for Error {
pub trait FilterFactory: Sync + Send {
/// name returns the configuration name for the Filter
fn name(&self) -> String;

/// Checks that the provided config is of a type expected by the filter
/// and that it is valid.
fn validate_config(&self, config: &serde_yaml::Value) -> Result<(), ValidationError>;

/// Returns a new filter based on the provided `config`.
/// This method is only called if `validate_config` returned `Ok(())` for `config`.
/// As a result, the method can assume that a valid `config` is provided.
fn create_from_config(&self, config: &serde_yaml::Value) -> Result<Box<dyn Filter>, Error>;
}

Expand All @@ -86,6 +100,27 @@ impl FilterRegistry {
}
}

/// Checks that all provided filter configurations are valid, returning
/// an error for each invalid config if any.
pub fn validate_filter_configs(&self, configs: &Vec<FilterConfig>) -> Result<(), Vec<Error>> {
let invalid = configs
.into_iter()
.map(|config| (self.registry.get(&config.name), config))
.filter_map(|(factory, config)| match factory {
None => Some(Error::NotFound(config.name.clone())),
Some(factory) => factory
.validate_config(&config.config)
.map_or_else(|err| Some(Error::InvalidConfig(err)), |_| None),
})
.collect::<Vec<_>>();

if invalid.is_empty() {
Ok(())
} else {
Err(invalid)
}
}

/// insert registers a Filter under the provider's given name.
pub fn insert<P: 'static>(&mut self, provider: P)
where
Expand Down
45 changes: 26 additions & 19 deletions src/extensions/filters/debug_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::net::SocketAddr;

use slog::{info, o, Logger};

use crate::config::EndPoint;
use crate::config::{EndPoint, ValidationError};
use crate::extensions::filter_registry::{Error, FilterFactory};
use crate::extensions::Filter;
use serde_yaml::Value;
Expand Down Expand Up @@ -69,35 +69,41 @@ impl DebugFilterFactory {
pub fn new(base: &Logger) -> Self {
DebugFilterFactory { log: base.clone() }
}
}

impl FilterFactory for DebugFilterFactory {
fn name(&self) -> String {
return String::from("quilkin.core.v1alpaha1.debug");
}

fn create_from_config(&self, config: &Value) -> Result<Box<dyn Filter>, Error> {
// pull out the Option<&Value>
fn validate(&self, config: &Value) -> Result<Option<String>, ValidationError> {
let prefix = match config {
serde_yaml::Value::Mapping(map) => map.get(&serde_yaml::Value::from("id")),
_ => None,
};

return match prefix {
match prefix {
// if no config value supplied, then no prefix, which is fine
None => Ok(Box::new(DebugFilter::new(&self.log, None))),
None => Ok(None),
// return an Error if the id exists but is not a string.
Some(value) => match value.as_str() {
None => Err(Error::FieldInvalid {
None => Err(ValidationError::FieldInvalid {
field: "config.id".to_string(),
reason: "id value should be a string".to_string(),
}),
Some(prefix) => Ok(Box::new(DebugFilter::new(
&self.log,
Some(prefix.to_string()),
))),
Some(prefix) => Ok(Some(prefix.to_owned())),
},
};
}
}
}

impl FilterFactory for DebugFilterFactory {
fn name(&self) -> String {
return String::from("quilkin.core.v1alpaha1.debug");
}

fn validate_config(&self, config: &Value) -> Result<(), ValidationError> {
self.validate(config).map(|_| ())
}

fn create_from_config(&self, config: &Value) -> Result<Box<dyn Filter>, Error> {
self.validate(config)
.map(|prefix| Box::new(DebugFilter::new(&self.log, prefix)) as Box<dyn Filter>)
.map_err(|err| Error::InvalidConfig(err))
}
}

Expand Down Expand Up @@ -145,6 +151,7 @@ mod tests {
use super::*;
use serde_yaml::Mapping;
use serde_yaml::Value;
use std::error::Error;

#[test]
fn on_downstream_receive() {
Expand Down Expand Up @@ -218,12 +225,12 @@ mod tests {
Ok(_) => assert!(false, "should be an error"),
Err(err) => {
assert_eq!(
Error::FieldInvalid {
ValidationError::FieldInvalid {
field: "config.id".to_string(),
reason: "id value should be a string".to_string()
}
.to_string(),
err.to_string()
err.source().unwrap().to_string()
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ async fn main() {

let config = Arc::new(Config::from_reader(File::open(filename).unwrap()).unwrap());
config.validate().unwrap();
filter_registry
.validate_filter_configs(&config.filters)
.unwrap();

let server = Server::new(
base_logger,
filter_registry,
Expand Down
Loading

0 comments on commit 46fc8d6

Please sign in to comment.