From ddb2236854340fac41e2ccbd539f5ee3204a00db Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Wed, 28 Jul 2021 02:04:21 +0200 Subject: [PATCH] Filter Extension Re-organisation (#293) * Reorganisation filter extensions * Update src/filters/factory.rs * Update docs/extensions/filters/writing_custom_filters.md Co-authored-by: Ifeanyi Ubah Co-authored-by: Mark Mandel --- .../filters/writing_custom_filters.md | 14 +- examples/quilkin-filter-example/src/main.rs | 13 +- macros/src/filter.rs | 115 --------- macros/src/lib.rs | 55 +--- src/config.rs | 19 +- .../config.rs => config/config_type.rs} | 2 +- src/filters.rs | 62 +++-- src/filters/{extensions => }/capture_bytes.rs | 237 +++--------------- src/filters/capture_bytes/capture.rs | 37 +++ src/filters/capture_bytes/config.rs | 177 +++++++++++++ .../{extensions => }/capture_bytes/metrics.rs | 0 .../{extensions => }/capture_bytes/proto.rs | 0 src/filters/chain.rs | 4 +- src/filters/{extensions => }/compress.rs | 209 +++------------ src/filters/compress/compressor.rs | 48 ++++ src/filters/compress/config.rs | 129 ++++++++++ .../{extensions => }/compress/metrics.rs | 0 .../{extensions => }/compress/proto.rs | 0 src/filters/concatenate_bytes.rs | 95 +++++++ src/filters/concatenate_bytes/config.rs | 101 ++++++++ src/filters/{extensions => }/debug.rs | 71 +++--- src/filters/factory.rs | 6 +- src/filters/load_balancer.rs | 178 +++++++++++++ src/filters/load_balancer/config.rs | 149 +++++++++++ src/filters/load_balancer/endpoint_chooser.rs | 62 +++++ .../{extensions => }/load_balancer/proto.rs | 0 .../{extensions => }/local_rate_limit.rs | 162 ++++++------ .../local_rate_limit/metrics.rs | 0 .../local_rate_limit/proto.rs | 0 src/filters/{extensions.rs => metadata.rs} | 23 +- src/filters/set.rs | 30 +-- src/filters/{extensions => }/token_router.rs | 97 +++---- .../{extensions => }/token_router/metrics.rs | 0 .../{extensions => }/token_router/proto.rs | 0 src/lib.rs | 2 +- src/test_utils.rs | 2 +- tests/compress.rs | 6 +- tests/concatenate_bytes.rs | 4 +- tests/filter_order.rs | 11 +- tests/filters.rs | 4 +- tests/load_balancer.rs | 4 +- tests/local_rate_limit.rs | 4 +- tests/token_router.rs | 9 +- 43 files changed, 1348 insertions(+), 793 deletions(-) delete mode 100644 macros/src/filter.rs rename src/{filters/config.rs => config/config_type.rs} (97%) rename src/filters/{extensions => }/capture_bytes.rs (57%) create mode 100644 src/filters/capture_bytes/capture.rs create mode 100644 src/filters/capture_bytes/config.rs rename src/filters/{extensions => }/capture_bytes/metrics.rs (100%) rename src/filters/{extensions => }/capture_bytes/proto.rs (100%) rename src/filters/{extensions => }/compress.rs (79%) create mode 100644 src/filters/compress/compressor.rs create mode 100644 src/filters/compress/config.rs rename src/filters/{extensions => }/compress/metrics.rs (100%) rename src/filters/{extensions => }/compress/proto.rs (100%) create mode 100644 src/filters/concatenate_bytes.rs create mode 100644 src/filters/concatenate_bytes/config.rs rename src/filters/{extensions => }/debug.rs (91%) create mode 100644 src/filters/load_balancer.rs create mode 100644 src/filters/load_balancer/config.rs create mode 100644 src/filters/load_balancer/endpoint_chooser.rs rename src/filters/{extensions => }/load_balancer/proto.rs (100%) rename src/filters/{extensions => }/local_rate_limit.rs (88%) rename src/filters/{extensions => }/local_rate_limit/metrics.rs (100%) rename src/filters/{extensions => }/local_rate_limit/proto.rs (100%) rename src/filters/{extensions.rs => metadata.rs} (55%) rename src/filters/{extensions => }/token_router.rs (95%) rename src/filters/{extensions => }/token_router/metrics.rs (100%) rename src/filters/{extensions => }/token_router/proto.rs (100%) diff --git a/docs/extensions/filters/writing_custom_filters.md b/docs/extensions/filters/writing_custom_filters.md index 34728dbb85..e846d901ed 100644 --- a/docs/extensions/filters/writing_custom_filters.md +++ b/docs/extensions/filters/writing_custom_filters.md @@ -67,9 +67,10 @@ To extend Quilkin's code with our own custom filter, we need to do the following // src/main.rs use quilkin::filters::{Filter, ReadContext, ReadResponse, WriteContext, WriteResponse}; + const NAME: &str = "greet.v1"; + // This creates adds an associated const named `FILTER_NAME` that points // to `"greet.v1"`. - #[quilkin::filter("greet.v1")] struct Greet; impl Filter for Greet { @@ -88,7 +89,7 @@ To extend Quilkin's code with our own custom filter, we need to do the following ```rust // src/main.rs - # #[quilkin::filter("greet.v1")] + # const NAME: &str = "greet.v1"; # struct Greet; # impl Filter for Greet {} # use quilkin::filters::Filter; @@ -97,8 +98,8 @@ To extend Quilkin's code with our own custom filter, we need to do the following struct GreetFilterFactory; impl FilterFactory for GreetFilterFactory { fn name(&self) -> &'static str { - // We provide the name of filter that we defined with `#[quilkin::filter]` - Greet::FILTER_NAME + // We provide the name of filter that we defined earlier. + NAME } fn create_filter(&self, _: CreateFilterArgs) -> Result, Error> { Ok(Box::new(Greet)) @@ -207,7 +208,6 @@ The [Serde] crate is used to describe static YAML configuration in code while [P # use quilkin::filters::{Filter, ReadContext, ReadResponse, WriteContext, WriteResponse}; - #[quilkin::filter("greet.v1")] struct Greet(String); impl Filter for Greet { @@ -238,7 +238,7 @@ The [Serde] crate is used to describe static YAML configuration in code while [P # struct Greet(String); # impl Filter for Greet { } - use quilkin::filters::ConfigType; + use quilkin::config::ConfigType; struct GreetFilterFactory; impl FilterFactory for GreetFilterFactory { @@ -345,7 +345,7 @@ However, it usually contains a Protobuf equivalent of the filter's static config ```rust // src/main.rs - # use quilkin::filters::{ConfigType, CreateFilterArgs, Error, Filter, FilterFactory}; + # use quilkin::{config::ConfigType, filters::{CreateFilterArgs, Error, Filter, FilterFactory}}; # use serde::{Deserialize, Serialize}; # #[derive(Serialize, Deserialize, Debug)] # struct Config { diff --git a/examples/quilkin-filter-example/src/main.rs b/examples/quilkin-filter-example/src/main.rs index 4f47d33d2e..27e9098e79 100644 --- a/examples/quilkin-filter-example/src/main.rs +++ b/examples/quilkin-filter-example/src/main.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use quilkin::filters::{prelude::*, DynFilterFactory};; +use quilkin::filters::prelude::*; use quilkin::runner::run; use bytes::Bytes; @@ -29,7 +29,12 @@ mod greet { include!(concat!(env!("OUT_DIR"), "/greet.rs")); } -#[quilkin::filter("greet.v1")] +pub const NAME: &str = "greet.v1"; + +pub fn factory() -> DynFilterFactory { + Box::from(GreetFilterFactory) +} + struct Greet(String); impl Filter for Greet { @@ -48,7 +53,7 @@ impl Filter for Greet { struct GreetFilterFactory; impl FilterFactory for GreetFilterFactory { fn name(&self) -> &'static str { - Greet::FILTER_NAME + NAME } fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { let greeting = match args.config.unwrap() { @@ -68,5 +73,5 @@ impl FilterFactory for GreetFilterFactory { #[tokio::main] async fn main() { - run(vec![DynFilterFactory::from(GreetFilterFactory)].into_iter()).await.unwrap(); + run(vec![self::factory()].into_iter()).await.unwrap(); } diff --git a/macros/src/filter.rs b/macros/src/filter.rs deleted file mode 100644 index c36c89cc71..0000000000 --- a/macros/src/filter.rs +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use proc_macro2::{Span, TokenStream}; -use quote::{quote, ToTokens, TokenStreamExt}; -use syn::{ - parse::{Parse, ParseStream}, - spanned::Spanned, - Ident, NestedMeta, Token, -}; - -/// The data representation of `#[quilkin::filter]`. -pub(crate) struct FilterAttribute { - /// The protobuf ID for the filter. - id: String, - /// The visibility of the `PROTOBUF_ID` constant. - vis: syn::Visibility, -} - -fn parse_str_literal(input: &syn::Lit, span: Span) -> syn::Result { - match input { - syn::Lit::Str(s) => { - let s = s.value(); - - if s.is_empty() { - Err(syn::Error::new(span, "Str literal must not be empty.")) - } else { - Ok(s) - } - } - _ => Err(syn::Error::new(span, "Expected str literal.")), - } -} - -fn parse_meta_lit_str(input: &NestedMeta, span: Span) -> syn::Result { - match input { - NestedMeta::Lit(lit) => parse_str_literal(lit, lit.span()), - _ => Err(syn::Error::new(span, "Expected literal.")), - } -} - -fn parse_meta_name_value(input: &NestedMeta, span: Span) -> syn::Result { - match input { - NestedMeta::Meta(syn::Meta::NameValue(value)) => Ok(value.clone()), - _ => Err(syn::Error::new(span, "Expected `=`.")), - } -} - -impl Parse for FilterAttribute { - fn parse(input: ParseStream) -> syn::Result { - let args = syn::punctuated::Punctuated::::parse_terminated(input)?; - - let mut args = args.iter(); - - let id = { - let arg = args - .next() - .ok_or_else(|| syn::Error::new(input.span(), "Expected a protobuf identifier."))?; - parse_meta_lit_str(arg, arg.span())? - }; - - let mut vis = None; - - for arg in args { - let name_value = parse_meta_name_value(arg, arg.span())?; - - if name_value.path.is_ident("vis") { - if vis.is_some() { - return Err(syn::Error::new( - name_value.span(), - "`vis` defined more than once.", - )); - } - - let input = parse_str_literal(&name_value.lit, name_value.lit.span())?; - vis = Some(syn::parse_str(&input)?); - } - } - - Ok(Self { - id, - vis: vis.unwrap_or_else(|| syn::parse_quote!(pub (crate))), - }) - } -} - -impl ToTokens for FilterAttribute { - fn to_tokens(&self, tokens: &mut TokenStream) { - let id = &self.id; - let vis = &self.vis; - let mut protobuf_path = - syn::punctuated::Punctuated::::new(); - - let split = self.id.split('.'); - protobuf_path - .extend(split.map(|s| syn::PathSegment::from(Ident::new(s, Span::mixed_site())))); - - tokens.append_all(quote! { - #vis const FILTER_NAME: &'static str = #id; - }) - } -} diff --git a/macros/src/lib.rs b/macros/src/lib.rs index b136d9ed66..9f54bd4419 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -14,13 +14,11 @@ * limitations under the License. */ -mod filter; mod include; -use quote::{quote, ToTokens}; +use quote::ToTokens; use syn::parse_macro_input; -use filter::FilterAttribute; use include::IncludeProto; /// Includes generated Protobuf definitions from `tonic`. @@ -54,54 +52,3 @@ pub fn include_proto(input: proc_macro::TokenStream) -> proc_macro::TokenStream .to_token_stream() .into() } - -/// An attribute procedural macro for defining filters. -/// -/// The `filter` attribute can be prepended to any struct, to automatically -/// import the protobuf runtime version that was defined with [`include_proto`], -/// and it defines an associated constant named `FILTER_NAME` containing -/// the protobuf identifier. -/// -/// A string literal representing the gRPC Protobuf name of the struct should -/// always the first argument, followed by these optional keyword arguments for -/// additional configuration. -/// -/// - `root` sets the root of the path to import your Protobuf generated struct. -/// **default:** `self`. -/// -/// - `vis` sets the visibility of the associated `PROTOBUF_ID` constant. -/// **default:** `pub (crate)`. -/// -/// The macro generates code that looks something like the following; -/// -/// ### Input -/// ``` -/// #[quilkin::filter("quilkin.extensions.filters.debug.v1alpha1.Debug")] -/// pub struct Debug; -/// ``` -/// -/// ### Output -/// ``` -/// impl Debug { -/// pub (crate) const FILTER_NAME: &str = "quilkin.extensions.filters.debug.v1alpha1.Debug"; -/// } -/// ``` -#[proc_macro_attribute] -pub fn filter( - args: proc_macro::TokenStream, - input: proc_macro::TokenStream, -) -> proc_macro::TokenStream { - let constant = parse_macro_input!(args as FilterAttribute); - let item = parse_macro_input!(input as syn::ItemStruct); - let name = &item.ident; - let (impl_generics, ty_generics, where_clause) = item.generics.split_for_impl(); - - quote!( - #item - - impl #impl_generics #name #ty_generics #where_clause { - #constant - } - ) - .into() -} diff --git a/src/config.rs b/src/config.rs index 3b77fd2b07..a87e4774a7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,17 +23,24 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; mod builder; +mod config_type; mod endpoints; mod error; mod metadata; -pub use crate::config::endpoints::{ - EmptyListError, Endpoints, RetainedItems, UpstreamEndpoints, UpstreamEndpointsIter, +pub(crate) use self::{ + error::ValueInvalidArgs, + metadata::{extract_endpoint_tokens, parse_endpoint_metadata_from_yaml}, +}; + +pub use self::{ + builder::Builder, + config_type::ConfigType, + endpoints::{ + EmptyListError, Endpoints, RetainedItems, UpstreamEndpoints, UpstreamEndpointsIter, + }, + error::ValidationError, }; -pub(crate) use crate::config::error::ValueInvalidArgs; -pub use builder::Builder; -pub use error::ValidationError; -pub(crate) use metadata::{extract_endpoint_tokens, parse_endpoint_metadata_from_yaml}; base64_serde_type!(Base64Standard, base64::STANDARD); diff --git a/src/filters/config.rs b/src/config/config_type.rs similarity index 97% rename from src/filters/config.rs rename to src/config/config_type.rs index 376d8aacfb..a2871e159a 100644 --- a/src/filters/config.rs +++ b/src/config/config_type.rs @@ -18,7 +18,7 @@ use std::convert::TryFrom; use bytes::Bytes; -use crate::filters::error::{ConvertProtoConfigError, Error}; +use crate::filters::{ConvertProtoConfigError, Error}; /// The configuration of a [`Filter`][crate::filters::Filter] from either a /// static or dynamic source. diff --git a/src/filters.rs b/src/filters.rs index fd2089f5bc..adc2e2a9a8 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -16,7 +16,6 @@ //! Filters for processing packets. -mod config; mod error; mod factory; mod read; @@ -27,19 +26,26 @@ mod write; pub(crate) mod chain; pub(crate) mod manager; -pub mod extensions; +pub mod capture_bytes; +pub mod compress; +pub mod concatenate_bytes; +pub mod debug; +pub mod load_balancer; +pub mod local_rate_limit; +pub mod metadata; +pub mod token_router; /// Prelude containing all types and traits required to implement [`Filter`] and /// [`FilterFactory`]. pub mod prelude { pub use super::{ - ConvertProtoConfigError, CreateFilterArgs, Error, Filter, FilterFactory, ReadContext, - ReadResponse, WriteContext, WriteResponse, + ConvertProtoConfigError, CreateFilterArgs, DynFilterFactory, Error, Filter, FilterFactory, + ReadContext, ReadResponse, WriteContext, WriteResponse, }; } +// Core Filter types pub use self::{ - config::ConfigType, error::{ConvertProtoConfigError, Error}, factory::{CreateFilterArgs, DynFilterFactory, FilterFactory}, read::{ReadContext, ReadResponse}, @@ -50,25 +56,47 @@ pub use self::{ pub(crate) use self::chain::FilterChain; -/// Filter is a trait for routing and manipulating packets. +/// Trait for routing and manipulating packets. +/// +/// An implementation of [`Filter`] provides a `read` and a `write` method. Both +/// methods are invoked by the proxy when it consults the filter chain - their +/// arguments contain information about the packet being processed. +/// - `read` is invoked when a packet is received on the local downstream port +/// and is to be sent to an upstream endpoint. +/// - `write` is invoked in the opposite direction when a packet is received +/// from an upstream endpoint and is to be sent to a downstream client. +/// +/// **Metrics** +/// +/// * `filter_read_duration_seconds` The duration it took for a `filter`'s +/// `read` implementation to execute. +/// * Labels +/// * `filter` The name of the filter being executed. +/// +/// * `filter_write_duration_seconds` The duration it took for a `filter`'s +/// `write` implementation to execute. +/// * Labels +/// * `filter` The name of the filter being executed. pub trait Filter: Send + Sync { - /// Read is invoked when the proxy receives data from a downstream connection on the - /// listening port. + /// [`Filter::read`] is invoked when the proxy receives data from a + /// downstream connection on the listening port. + /// /// This function should return a [`ReadResponse`] containing the array of - /// endpoints that the packet should be sent to and the packet that should be - /// sent (which may be manipulated) as well. - /// If the packet should be rejected, return None. - /// By default, passes the context through unchanged + /// endpoints that the packet should be sent to and the packet that should + /// be sent (which may be manipulated) as well. If the packet should be + /// rejected, return [`None`]. By default, the context passes + /// through unchanged. fn read(&self, ctx: ReadContext) -> Option { Some(ctx.into()) } - /// Write is invoked when the proxy is about to send data to a downstream connection - /// via the listening port after receiving it via one of the upstream Endpoints. + /// [`Filter::write`] is invoked when the proxy is about to send data to a + /// downstream connection via the listening port after receiving it via one + /// of the upstream Endpoints. + /// /// This function should return an [`WriteResponse`] containing the packet to - /// be sent (which may be manipulated). - /// If the packet should be rejected, return None. - /// By default, passes the context through unchanged + /// be sent (which may be manipulated). If the packet should be rejected, + /// return [`None`]. By default, the context passes through unchanged. fn write(&self, ctx: WriteContext) -> Option { Some(ctx.into()) } diff --git a/src/filters/extensions/capture_bytes.rs b/src/filters/capture_bytes.rs similarity index 57% rename from src/filters/extensions/capture_bytes.rs rename to src/filters/capture_bytes.rs index 53efe21788..306a569efb 100644 --- a/src/filters/extensions/capture_bytes.rs +++ b/src/filters/capture_bytes.rs @@ -14,119 +14,30 @@ * limitations under the License. */ -use std::convert::TryFrom; -use std::sync::Arc; - -use serde::{Deserialize, Serialize}; -use slog::{o, warn, Logger}; - -use metrics::Metrics; - -use crate::filters::{extensions::CAPTURED_BYTES, prelude::*}; -use crate::map_proto_enum; -use proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::Strategy as ProtoStrategy, CaptureBytes as ProtoConfig, -}; - +mod capture; +mod config; mod metrics; mod proto; -#[derive(Serialize, Deserialize, Debug, PartialEq)] -/// Strategy to apply for acquiring a set of bytes in the UDP packet -enum Strategy { - #[serde(rename = "PREFIX")] - /// Looks for the set of bytes at the beginning of the packet - Prefix, - #[serde(rename = "SUFFIX")] - /// Look for the set of bytes at the end of the packet - Suffix, -} - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -struct Config { - #[serde(default)] - strategy: Strategy, - /// the number of bytes to capture - #[serde(rename = "size")] - size: usize, - /// the key to use when storing the captured bytes in the filter context - #[serde(rename = "metadataKey")] - #[serde(default = "default_metadata_key")] - metadata_key: String, - /// whether or not to remove the set of the bytes from the packet once captured - #[serde(default = "default_remove")] - remove: bool, -} - -/// default value for [`Config::remove`]. -fn default_remove() -> bool { - false -} - -/// default value for the context key in the Config -fn default_metadata_key() -> String { - CAPTURED_BYTES.into() -} +use std::sync::Arc; -impl Default for Strategy { - fn default() -> Self { - Strategy::Suffix - } -} +use slog::{o, warn, Logger}; -impl TryFrom for Config { - type Error = ConvertProtoConfigError; - - fn try_from(p: ProtoConfig) -> Result { - let strategy = p - .strategy - .map(|strategy| { - map_proto_enum!( - value = strategy.value, - field = "strategy", - proto_enum_type = ProtoStrategy, - target_enum_type = Strategy, - variants = [Suffix, Prefix] - ) - }) - .transpose()? - .unwrap_or_else(Strategy::default); - - Ok(Self { - strategy, - size: p.size as usize, - metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), - remove: p.remove.unwrap_or_else(default_remove), - }) - } -} +use crate::filters::prelude::*; -pub struct CaptureBytesFactory { - log: Logger, -} +use capture::Capture; +use metrics::Metrics; +use proto::quilkin::extensions::filters::capture_bytes::v1alpha1::CaptureBytes as ProtoConfig; -impl CaptureBytesFactory { - pub fn new(base: &Logger) -> Self { - CaptureBytesFactory { log: base.clone() } - } -} +pub use config::{Config, Strategy}; -impl FilterFactory for CaptureBytesFactory { - fn name(&self) -> &'static str { - CaptureBytes::FILTER_NAME - } +pub const NAME: &str = "quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes"; - fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { - Ok(Box::new(CaptureBytes::new( - &self.log, - self.require_config(args.config)? - .deserialize::(self.name())?, - Metrics::new(&args.metrics_registry)?, - ))) - } +/// Creates a new factory for generating capture filters. +pub fn factory(base: &Logger) -> DynFilterFactory { + Box::from(CaptureBytesFactory::new(base)) } -#[crate::filter("quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes")] struct CaptureBytes { log: Logger, capture: Box, @@ -139,14 +50,9 @@ struct CaptureBytes { impl CaptureBytes { fn new(base: &Logger, config: Config, metrics: Metrics) -> Self { - let capture: Box = match config.strategy { - Strategy::Prefix => Box::new(Prefix {}), - Strategy::Suffix => Box::new(Suffix {}), - }; - CaptureBytes { log: base.new(o!("source" => "extensions::CaptureBytes")), - capture, + capture: config.strategy.as_capture(), metrics, metadata_key: Arc::new(config.metadata_key), size: config.size, @@ -181,43 +87,33 @@ impl Filter for CaptureBytes { } } -/// Trait to implement different strategies for capturing packet data -trait Capture { - /// Capture the packet data from the contents. If remove is true, contents will be altered to - /// not have the retrieved set of bytes. - /// Returns the captured bytes. - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec; +struct CaptureBytesFactory { + log: Logger, } -struct Suffix; -impl Capture for Suffix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.split_off(contents.len() - size); - } - - contents - .iter() - .skip(contents.len() - size) - .cloned() - .collect::>() +impl CaptureBytesFactory { + pub fn new(base: &Logger) -> Self { + CaptureBytesFactory { log: base.clone() } } } -struct Prefix; -impl Capture for Prefix { - fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { - if remove { - return contents.drain(..size).collect(); - } +impl FilterFactory for CaptureBytesFactory { + fn name(&self) -> &'static str { + NAME + } - contents.iter().cloned().take(size).collect() + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + Ok(Box::new(CaptureBytes::new( + &self.log, + self.require_config(args.config)? + .deserialize::(self.name())?, + Metrics::new(&args.metrics_registry)?, + ))) } } #[cfg(test)] mod tests { - use std::convert::TryFrom; use std::sync::Arc; use prometheus::Registry; @@ -226,18 +122,13 @@ mod tests { use crate::config::Endpoints; use crate::test_utils::{assert_write_no_change, logger}; - use super::{ - default_metadata_key, default_remove, Capture, CaptureBytes, CaptureBytesFactory, Config, - Metrics, Prefix, Strategy, Suffix, - }; + use super::{CaptureBytes, CaptureBytesFactory, Config, Metrics, Strategy}; + + use super::capture::{Capture, Prefix, Suffix}; - use super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ - capture_bytes::{Strategy as ProtoStrategy, StrategyValue}, - CaptureBytes as ProtoConfig, - }; use crate::cluster::Endpoint; use crate::filters::{ - extensions::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext, + metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext, }; const TOKEN_KEY: &str = "TOKEN"; @@ -250,66 +141,6 @@ mod tests { ) } - #[test] - fn convert_proto_config() { - let test_cases = vec![ - ( - "should succeed when all valid values are provided", - ProtoConfig { - strategy: Some(StrategyValue { - value: ProtoStrategy::Suffix as i32, - }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - Some(Config { - strategy: Strategy::Suffix, - size: 42, - metadata_key: "foobar".into(), - remove: true, - }), - ), - ( - "should fail when invalid strategy is provided", - ProtoConfig { - strategy: Some(StrategyValue { value: 42 }), - size: 42, - metadata_key: Some("foobar".into()), - remove: Some(true), - }, - None, - ), - ( - "should use correct default values", - ProtoConfig { - strategy: None, - size: 42, - metadata_key: None, - remove: None, - }, - Some(Config { - strategy: Strategy::default(), - size: 42, - metadata_key: default_metadata_key(), - remove: default_remove(), - }), - ), - ]; - for (name, proto_config, expected) in test_cases { - let result = Config::try_from(proto_config); - assert_eq!( - result.is_err(), - expected.is_none(), - "{}: error expectation does not match", - name - ); - if let Some(expected) = expected { - assert_eq!(expected, result.unwrap(), "{}", name); - } - } - } - #[test] fn factory_valid_config_all() { let factory = CaptureBytesFactory::new(&logger()); diff --git a/src/filters/capture_bytes/capture.rs b/src/filters/capture_bytes/capture.rs new file mode 100644 index 0000000000..cf8c4abaad --- /dev/null +++ b/src/filters/capture_bytes/capture.rs @@ -0,0 +1,37 @@ +/// Trait to implement different strategies for capturing packet data +pub trait Capture { + /// Capture the packet data from the contents. If remove is true, contents will be altered to + /// not have the retrieved set of bytes. + /// Returns the captured bytes. + fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec; +} + +/// Capture from the end of the packet. +pub struct Suffix; + +impl Capture for Suffix { + fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { + if remove { + return contents.split_off(contents.len() - size); + } + + contents + .iter() + .skip(contents.len() - size) + .cloned() + .collect::>() + } +} + +/// Capture from the start of the packet. +pub struct Prefix; + +impl Capture for Prefix { + fn capture(&self, contents: &mut Vec, size: usize, remove: bool) -> Vec { + if remove { + return contents.drain(..size).collect(); + } + + contents.iter().cloned().take(size).collect() + } +} diff --git a/src/filters/capture_bytes/config.rs b/src/filters/capture_bytes/config.rs new file mode 100644 index 0000000000..2dfc2a4379 --- /dev/null +++ b/src/filters/capture_bytes/config.rs @@ -0,0 +1,177 @@ +/* + * Copyright 2021 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::TryFrom; + +use serde::{Deserialize, Serialize}; + +use super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ + capture_bytes::Strategy as ProtoStrategy, CaptureBytes as ProtoConfig, +}; +use crate::filters::{metadata::CAPTURED_BYTES, ConvertProtoConfigError}; +use crate::map_proto_enum; + +use super::capture::{Capture, Prefix, Suffix}; + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +/// Strategy to apply for acquiring a set of bytes in the UDP packet +pub enum Strategy { + #[serde(rename = "PREFIX")] + /// Looks for the set of bytes at the beginning of the packet + Prefix, + #[serde(rename = "SUFFIX")] + /// Look for the set of bytes at the end of the packet + Suffix, +} + +impl Strategy { + pub(crate) fn as_capture(&self) -> Box { + match self { + Self::Prefix => Box::new(Prefix {}), + Self::Suffix => Box::new(Suffix {}), + } + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct Config { + #[serde(default)] + pub strategy: Strategy, + /// the number of bytes to capture + #[serde(rename = "size")] + pub size: usize, + /// the key to use when storing the captured bytes in the filter context + #[serde(rename = "metadataKey")] + #[serde(default = "default_metadata_key")] + pub metadata_key: String, + /// whether or not to remove the set of the bytes from the packet once captured + #[serde(default = "default_remove")] + pub remove: bool, +} + +/// default value for [`Config::remove`]. +fn default_remove() -> bool { + false +} + +/// default value for the context key in the Config +fn default_metadata_key() -> String { + CAPTURED_BYTES.into() +} + +impl Default for Strategy { + fn default() -> Self { + Strategy::Suffix + } +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> Result { + let strategy = p + .strategy + .map(|strategy| { + map_proto_enum!( + value = strategy.value, + field = "strategy", + proto_enum_type = ProtoStrategy, + target_enum_type = Strategy, + variants = [Suffix, Prefix] + ) + }) + .transpose()? + .unwrap_or_else(Strategy::default); + + Ok(Self { + strategy, + size: p.size as usize, + metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), + remove: p.remove.unwrap_or_else(default_remove), + }) + } +} + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + + use super::super::proto::quilkin::extensions::filters::capture_bytes::v1alpha1::{ + capture_bytes::{Strategy as ProtoStrategy, StrategyValue}, + CaptureBytes as ProtoConfig, + }; + use super::*; + + #[test] + fn convert_proto_config() { + let test_cases = vec![ + ( + "should succeed when all valid values are provided", + ProtoConfig { + strategy: Some(StrategyValue { + value: ProtoStrategy::Suffix as i32, + }), + size: 42, + metadata_key: Some("foobar".into()), + remove: Some(true), + }, + Some(Config { + strategy: Strategy::Suffix, + size: 42, + metadata_key: "foobar".into(), + remove: true, + }), + ), + ( + "should fail when invalid strategy is provided", + ProtoConfig { + strategy: Some(StrategyValue { value: 42 }), + size: 42, + metadata_key: Some("foobar".into()), + remove: Some(true), + }, + None, + ), + ( + "should use correct default values", + ProtoConfig { + strategy: None, + size: 42, + metadata_key: None, + remove: None, + }, + Some(Config { + strategy: Strategy::default(), + size: 42, + metadata_key: default_metadata_key(), + remove: default_remove(), + }), + ), + ]; + for (name, proto_config, expected) in test_cases { + let result = Config::try_from(proto_config); + assert_eq!( + result.is_err(), + expected.is_none(), + "{}: error expectation does not match", + name + ); + if let Some(expected) = expected { + assert_eq!(expected, result.unwrap(), "{}", name); + } + } + } +} diff --git a/src/filters/extensions/capture_bytes/metrics.rs b/src/filters/capture_bytes/metrics.rs similarity index 100% rename from src/filters/extensions/capture_bytes/metrics.rs rename to src/filters/capture_bytes/metrics.rs diff --git a/src/filters/extensions/capture_bytes/proto.rs b/src/filters/capture_bytes/proto.rs similarity index 100% rename from src/filters/extensions/capture_bytes/proto.rs rename to src/filters/capture_bytes/proto.rs diff --git a/src/filters/chain.rs b/src/filters/chain.rs index d9043e15f4..c94912d28f 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -153,7 +153,7 @@ mod tests { use crate::config; use crate::config::{Endpoints, UpstreamEndpoints}; - use crate::filters::{extensions::DebugFactory, FilterFactory, FilterRegistry, FilterSet}; + use crate::filters::{debug, FilterRegistry, FilterSet}; use crate::test_utils::{logger, new_test_chain, TestFilter}; use super::*; @@ -162,7 +162,7 @@ mod tests { #[test] fn from_config() { let log = logger(); - let provider = DebugFactory::new(&log); + let provider = debug::factory(&log); // everything is fine let filter_configs = vec![config::Filter { diff --git a/src/filters/extensions/compress.rs b/src/filters/compress.rs similarity index 79% rename from src/filters/extensions/compress.rs rename to src/filters/compress.rs index 0268a4aa97..4cb41f285d 100644 --- a/src/filters/extensions/compress.rs +++ b/src/filters/compress.rs @@ -14,149 +14,30 @@ * limitations under the License. */ -use std::convert::TryFrom; -use std::io; - -use serde::{Deserialize, Serialize}; -use slog::{o, warn, Logger}; -use snap::read::FrameDecoder; -use snap::write::FrameEncoder; - -use self::quilkin::extensions::filters::compress::v1alpha1::{ - compress::Action as ProtoAction, compress::Mode as ProtoMode, Compress as ProtoConfig, -}; - -use crate::map_proto_enum; -use crate::{ - config::LOG_SAMPLING_RATE, - filters::{extensions::compress::metrics::Metrics, prelude::*}, -}; - +mod compressor; +mod config; mod metrics; crate::include_proto!("quilkin.extensions.filters.compress.v1alpha1"); -/// The library to use when compressing -#[derive(Serialize, Deserialize, Debug, PartialEq)] -pub enum Mode { - // we only support one mode for now, but adding in the config option to provide the - // option to expand for later. - #[serde(rename = "SNAPPY")] - Snappy, -} - -impl Default for Mode { - fn default() -> Self { - Mode::Snappy - } -} - -/// Whether to do nothing, compress or decompress the packet. -#[derive(Serialize, Deserialize, Debug, PartialEq)] -enum Action { - #[serde(rename = "DO_NOTHING")] - DoNothing, - #[serde(rename = "COMPRESS")] - Compress, - #[serde(rename = "DECOMPRESS")] - Decompress, -} +use slog::{o, warn, Logger}; -impl Default for Action { - fn default() -> Self { - Action::DoNothing - } -} +use crate::{config::LOG_SAMPLING_RATE, filters::prelude::*}; -#[derive(Serialize, Deserialize, Debug, PartialEq)] -struct Config { - #[serde(default)] - mode: Mode, - on_read: Action, - on_write: Action, -} +use self::quilkin::extensions::filters::compress::v1alpha1::Compress as ProtoConfig; +use compressor::Compressor; +use metrics::Metrics; -impl TryFrom for Config { - type Error = ConvertProtoConfigError; - - fn try_from(p: ProtoConfig) -> std::result::Result { - let mode = p - .mode - .map(|mode| { - map_proto_enum!( - value = mode.value, - field = "mode", - proto_enum_type = ProtoMode, - target_enum_type = Mode, - variants = [Snappy] - ) - }) - .transpose()? - .unwrap_or_else(Mode::default); - - let on_read = p - .on_read - .map(|on_read| { - map_proto_enum!( - value = on_read.value, - field = "on_read", - proto_enum_type = ProtoAction, - target_enum_type = Action, - variants = [DoNothing, Compress, Decompress] - ) - }) - .transpose()? - .unwrap_or_else(Action::default); - - let on_write = p - .on_write - .map(|on_write| { - map_proto_enum!( - value = on_write.value, - field = "on_write", - proto_enum_type = ProtoAction, - target_enum_type = Action, - variants = [DoNothing, Compress, Decompress] - ) - }) - .transpose()? - .unwrap_or_else(Action::default); - - Ok(Self { - mode, - on_read, - on_write, - }) - } -} +pub use config::{Action, Config, Mode}; -pub struct CompressFactory { - log: Logger, -} +pub const NAME: &str = "quilkin.extensions.filters.compress.v1alpha1.Compress"; -impl CompressFactory { - pub fn new(base: &Logger) -> Self { - CompressFactory { log: base.clone() } - } -} - -impl FilterFactory for CompressFactory { - fn name(&self) -> &'static str { - Compress::FILTER_NAME - } - - fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { - Ok(Box::new(Compress::new( - &self.log, - self.require_config(args.config)? - .deserialize::(self.name())?, - Metrics::new(&args.metrics_registry)?, - ))) - } +/// Returns a factory for creating compression filters. +pub fn factory(base: &Logger) -> DynFilterFactory { + Box::from(CompressFactory::new(base)) } /// Filter for compressing and decompressing packet data -#[crate::filter("quilkin.extensions.filters.compress.v1alpha1.Compress")] struct Compress { log: Logger, metrics: Metrics, @@ -167,22 +48,19 @@ struct Compress { } impl Compress { - pub fn new(base: &Logger, config: Config, metrics: Metrics) -> Self { - let compressor = match config.mode { - Mode::Snappy => Box::new(Snappy {}), - }; - Compress { + fn new(base: &Logger, config: Config, metrics: Metrics) -> Self { + Self { log: base.new(o!("source" => "extensions::Compress")), metrics, + compressor: config.mode.as_compressor(), compression_mode: config.mode, on_read: config.on_read, on_write: config.on_write, - compressor, } } /// Track a failed attempt at compression - fn failed_compression(&self, err: Box) -> Option { + fn failed_compression(&self, err: &dyn std::error::Error) -> Option { if self.metrics.packets_dropped_compress.get() % LOG_SAMPLING_RATE == 0 { warn!(self.log, "Packets are being dropped as they could not be compressed"; "mode" => #?self.compression_mode, "error" => %err, @@ -193,7 +71,7 @@ impl Compress { } /// Track a failed attempt at decompression - fn failed_decompression(&self, err: Box) -> Option { + fn failed_decompression(&self, err: &dyn std::error::Error) -> Option { if self.metrics.packets_dropped_decompress.get() % LOG_SAMPLING_RATE == 0 { warn!(self.log, "Packets are being dropped as they could not be decompressed"; "mode" => #?self.compression_mode, "error" => %err, @@ -219,7 +97,7 @@ impl Filter for Compress { .inc_by(ctx.contents.len() as u64); Some(ctx.into()) } - Err(err) => self.failed_compression(err), + Err(err) => self.failed_compression(&err), }, Action::Decompress => match self.compressor.decode(&mut ctx.contents) { Ok(()) => { @@ -231,7 +109,7 @@ impl Filter for Compress { .inc_by(ctx.contents.len() as u64); Some(ctx.into()) } - Err(err) => self.failed_decompression(err), + Err(err) => self.failed_decompression(&err), }, Action::DoNothing => Some(ctx.into()), } @@ -250,7 +128,7 @@ impl Filter for Compress { .inc_by(ctx.contents.len() as u64); Some(ctx.into()) } - Err(err) => self.failed_compression(err), + Err(err) => self.failed_compression(&err), }, Action::Decompress => match self.compressor.decode(&mut ctx.contents) { Ok(()) => { @@ -263,40 +141,35 @@ impl Filter for Compress { Some(ctx.into()) } - Err(err) => self.failed_decompression(err), + Err(err) => self.failed_decompression(&err), }, Action::DoNothing => Some(ctx.into()), } } } -type Result> = std::result::Result; - -/// A trait that provides a compression and decompression strategy for this filter. -/// Conversion takes place on a mutable Vec, to ensure the most performant compression or -/// decompression operation can occur. -trait Compressor { - /// Compress the contents of the Vec - overwriting the original content. - fn encode(&self, contents: &mut Vec) -> Result<()>; - /// Decompress the contents of the Vec - overwriting the original content. - fn decode(&self, contents: &mut Vec) -> Result<()>; +struct CompressFactory { + log: Logger, } -struct Snappy {} +impl CompressFactory { + pub fn new(base: &Logger) -> Self { + CompressFactory { log: base.clone() } + } +} -impl Compressor for Snappy { - fn encode(&self, contents: &mut Vec) -> Result<()> { - let input = std::mem::take(contents); - let mut wtr = FrameEncoder::new(contents); - io::copy(&mut input.as_slice(), &mut wtr)?; - Ok(()) +impl FilterFactory for CompressFactory { + fn name(&self) -> &'static str { + NAME } - fn decode(&self, contents: &mut Vec) -> Result<()> { - let input = std::mem::take(contents); - let mut rdr = FrameDecoder::new(input.as_slice()); - io::copy(&mut rdr, contents)?; - Ok(()) + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + Ok(Box::new(Compress::new( + &self.log, + self.require_config(args.config)? + .deserialize::(self.name())?, + Metrics::new(&args.metrics_registry)?, + ))) } } @@ -310,8 +183,8 @@ mod tests { use crate::cluster::Endpoint; use crate::config::{Endpoints, UpstreamEndpoints}; use crate::filters::{ - extensions::compress::Compressor, CreateFilterArgs, Filter, FilterFactory, ReadContext, - WriteContext, + compress::{compressor::Snappy, Compressor}, + CreateFilterArgs, Filter, FilterFactory, ReadContext, WriteContext, }; use crate::test_utils::logger; @@ -319,7 +192,7 @@ mod tests { compress::{Action as ProtoAction, ActionValue, Mode as ProtoMode, ModeValue}, Compress as ProtoConfig, }; - use super::{Action, Compress, CompressFactory, Config, Metrics, Mode, Snappy}; + use super::{Action, Compress, CompressFactory, Config, Metrics, Mode}; #[test] fn convert_proto_config() { diff --git a/src/filters/compress/compressor.rs b/src/filters/compress/compressor.rs new file mode 100644 index 0000000000..62563934eb --- /dev/null +++ b/src/filters/compress/compressor.rs @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::io; + +use snap::read::FrameDecoder; +use snap::write::FrameEncoder; + +/// A trait that provides a compression and decompression strategy for this filter. +/// Conversion takes place on a mutable Vec, to ensure the most performant compression or +/// decompression operation can occur. +pub(crate) trait Compressor { + /// Compress the contents of the Vec - overwriting the original content. + fn encode(&self, contents: &mut Vec) -> io::Result<()>; + /// Decompress the contents of the Vec - overwriting the original content. + fn decode(&self, contents: &mut Vec) -> io::Result<()>; +} + +pub(crate) struct Snappy {} + +impl Compressor for Snappy { + fn encode(&self, contents: &mut Vec) -> io::Result<()> { + let input = std::mem::take(contents); + let mut wtr = FrameEncoder::new(contents); + io::copy(&mut input.as_slice(), &mut wtr)?; + Ok(()) + } + + fn decode(&self, contents: &mut Vec) -> io::Result<()> { + let input = std::mem::take(contents); + let mut rdr = FrameDecoder::new(input.as_slice()); + io::copy(&mut rdr, contents)?; + Ok(()) + } +} diff --git a/src/filters/compress/config.rs b/src/filters/compress/config.rs new file mode 100644 index 0000000000..9c5f8677f2 --- /dev/null +++ b/src/filters/compress/config.rs @@ -0,0 +1,129 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::TryFrom; + +use serde::{Deserialize, Serialize}; + +use super::compressor::{Compressor, Snappy}; +use super::quilkin::extensions::filters::compress::v1alpha1::{ + compress::Action as ProtoAction, compress::Mode as ProtoMode, Compress as ProtoConfig, +}; +use crate::{filters::ConvertProtoConfigError, map_proto_enum}; + +/// The library to use when compressing. +#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Serialize)] +#[non_exhaustive] +pub enum Mode { + // we only support one mode for now, but adding in the config option to + // provide the option to expand for later. + #[serde(rename = "SNAPPY")] + Snappy, +} + +impl Mode { + pub(crate) fn as_compressor(&self) -> Box { + match self { + Self::Snappy => Box::from(Snappy {}), + } + } +} + +impl Default for Mode { + fn default() -> Self { + Mode::Snappy + } +} + +/// Whether to do nothing, compress or decompress the packet. +#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Serialize)] +pub enum Action { + #[serde(rename = "DO_NOTHING")] + DoNothing, + #[serde(rename = "COMPRESS")] + Compress, + #[serde(rename = "DECOMPRESS")] + Decompress, +} + +impl Default for Action { + fn default() -> Self { + Action::DoNothing + } +} + +#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Serialize)] +#[non_exhaustive] +pub struct Config { + #[serde(default)] + pub mode: Mode, + pub on_read: Action, + pub on_write: Action, +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> std::result::Result { + let mode = p + .mode + .map(|mode| { + map_proto_enum!( + value = mode.value, + field = "mode", + proto_enum_type = ProtoMode, + target_enum_type = Mode, + variants = [Snappy] + ) + }) + .transpose()? + .unwrap_or_else(Mode::default); + + let on_read = p + .on_read + .map(|on_read| { + map_proto_enum!( + value = on_read.value, + field = "on_read", + proto_enum_type = ProtoAction, + target_enum_type = Action, + variants = [DoNothing, Compress, Decompress] + ) + }) + .transpose()? + .unwrap_or_else(Action::default); + + let on_write = p + .on_write + .map(|on_write| { + map_proto_enum!( + value = on_write.value, + field = "on_write", + proto_enum_type = ProtoAction, + target_enum_type = Action, + variants = [DoNothing, Compress, Decompress] + ) + }) + .transpose()? + .unwrap_or_else(Action::default); + + Ok(Self { + mode, + on_read, + on_write, + }) + } +} diff --git a/src/filters/extensions/compress/metrics.rs b/src/filters/compress/metrics.rs similarity index 100% rename from src/filters/extensions/compress/metrics.rs rename to src/filters/compress/metrics.rs diff --git a/src/filters/extensions/compress/proto.rs b/src/filters/compress/proto.rs similarity index 100% rename from src/filters/extensions/compress/proto.rs rename to src/filters/compress/proto.rs diff --git a/src/filters/concatenate_bytes.rs b/src/filters/concatenate_bytes.rs new file mode 100644 index 0000000000..ff3253fab0 --- /dev/null +++ b/src/filters/concatenate_bytes.rs @@ -0,0 +1,95 @@ +/* + * Copyright 2020 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod config; + +use crate::filters::prelude::*; + +use config::ProtoConfig; +pub use config::{Config, Strategy}; + +pub const NAME: &str = "quilkin.extensions.filters.concatenate_bytes.v1alpha1.ConcatenateBytes"; + +/// Returns a factory for creating concatenation filters. +pub fn factory() -> DynFilterFactory { + Box::from(ConcatBytesFactory) +} + +/// The `ConcatenateBytes` filter's job is to add a byte packet to either the +/// beginning or end of each UDP packet that passes through. This is commonly +/// used to provide an auth token to each packet, so they can be +/// routed appropriately. +struct ConcatenateBytes { + on_read: Strategy, + on_write: Strategy, + bytes: Vec, +} + +impl ConcatenateBytes { + pub fn new(config: Config) -> Self { + ConcatenateBytes { + on_read: config.on_read, + on_write: config.on_write, + bytes: config.bytes, + } + } +} + +impl Filter for ConcatenateBytes { + fn read(&self, mut ctx: ReadContext) -> Option { + match self.on_read { + Strategy::Append => { + ctx.contents.extend(self.bytes.iter()); + } + Strategy::Prepend => { + ctx.contents.splice(..0, self.bytes.iter().cloned()); + } + Strategy::DoNothing => {} + } + + Some(ctx.into()) + } + + fn write(&self, mut ctx: WriteContext) -> Option { + match self.on_write { + Strategy::Append => { + ctx.contents.extend(self.bytes.iter()); + } + Strategy::Prepend => { + ctx.contents.splice(..0, self.bytes.iter().cloned()); + } + Strategy::DoNothing => {} + } + + Some(ctx.into()) + } +} + +#[derive(Default)] +struct ConcatBytesFactory; + +impl FilterFactory for ConcatBytesFactory { + fn name(&self) -> &'static str { + NAME + } + + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + Ok(Box::new(ConcatenateBytes::new( + self.require_config(args.config)? + .deserialize::(self.name())?, + ))) + } +} diff --git a/src/filters/concatenate_bytes/config.rs b/src/filters/concatenate_bytes/config.rs new file mode 100644 index 0000000000..f44089771e --- /dev/null +++ b/src/filters/concatenate_bytes/config.rs @@ -0,0 +1,101 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +crate::include_proto!("quilkin.extensions.filters.concatenate_bytes.v1alpha1"); + +use std::convert::TryFrom; + +use base64_serde::base64_serde_type; +use serde::{Deserialize, Serialize}; + +use crate::{filters::prelude::*, map_proto_enum}; + +use self::quilkin::extensions::filters::concatenate_bytes::v1alpha1::concatenate_bytes::Strategy as ProtoStrategy; + +pub use self::quilkin::extensions::filters::concatenate_bytes::v1alpha1::ConcatenateBytes as ProtoConfig; + +base64_serde_type!(Base64Standard, base64::STANDARD); + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub enum Strategy { + #[serde(rename = "APPEND")] + Append, + #[serde(rename = "PREPEND")] + Prepend, + #[serde(rename = "DO_NOTHING")] + DoNothing, +} + +impl Default for Strategy { + fn default() -> Self { + Strategy::DoNothing + } +} + +/// Config represents a `ConcatenateBytes` filter configuration. +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[non_exhaustive] +pub struct Config { + /// Whether or not to `append` or `prepend` or `do nothing` on Filter `Read` + #[serde(default)] + pub on_read: Strategy, + /// Whether or not to `append` or `prepend` or `do nothing` on Filter `Write` + #[serde(default)] + pub on_write: Strategy, + + #[serde(with = "Base64Standard")] + pub bytes: Vec, +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> Result { + let on_read = p + .on_read + .map(|strategy| { + map_proto_enum!( + value = strategy.value, + field = "on_read", + proto_enum_type = ProtoStrategy, + target_enum_type = Strategy, + variants = [DoNothing, Append, Prepend] + ) + }) + .transpose()? + .unwrap_or_else(Strategy::default); + + let on_write = p + .on_write + .map(|strategy| { + map_proto_enum!( + value = strategy.value, + field = "on_write", + proto_enum_type = ProtoStrategy, + target_enum_type = Strategy, + variants = [DoNothing, Append, Prepend] + ) + }) + .transpose()? + .unwrap_or_else(Strategy::default); + + Ok(Self { + on_read, + on_write, + bytes: p.bytes, + }) + } +} diff --git a/src/filters/extensions/debug.rs b/src/filters/debug.rs similarity index 91% rename from src/filters/extensions/debug.rs rename to src/filters/debug.rs index 1d19266f48..d10dab3ec9 100644 --- a/src/filters/extensions/debug.rs +++ b/src/filters/debug.rs @@ -25,12 +25,18 @@ crate::include_proto!("quilkin.extensions.filters.debug.v1alpha1"); use self::quilkin::extensions::filters::debug::v1alpha1::Debug as ProtoDebug; /// Debug logs all incoming and outgoing packets -#[crate::filter("quilkin.extensions.filters.debug.v1alpha1.Debug")] #[derive(Debug)] -pub struct Debug { +struct Debug { log: Logger, } +pub const NAME: &str = "quilkin.extensions.filters.debug.v1alpha1.Debug"; + +/// Creates a new factory for generating debug filters. +pub fn factory(base: &Logger) -> DynFilterFactory { + Box::from(DebugFactory::new(base)) +} + impl Debug { /// Constructor for the Debug. Pass in a "id" to append a string to your log messages from this /// Filter. @@ -44,22 +50,32 @@ impl Debug { } } -/// A Debug filter's configuration. -#[derive(Serialize, Deserialize, Debug)] -struct Config { - id: Option, -} +impl Filter for Debug { + fn read(&self, ctx: ReadContext) -> Option { + info!(self.log, "Read filter event"; "from" => ctx.from, "contents" => packet_to_string(ctx.contents.clone())); + Some(ctx.into()) + } -impl TryFrom for Config { - type Error = ConvertProtoConfigError; + fn write(&self, ctx: WriteContext) -> Option { + info!(self.log, "Write filter event"; "endpoint" => ctx.endpoint.address, + "from" => ctx.from, + "to" => ctx.to, + "contents" => packet_to_string(ctx.contents.clone())); + Some(ctx.into()) + } +} - fn try_from(p: ProtoDebug) -> Result { - Ok(Config { id: p.id }) +/// packet_to_string takes the content, and attempts to convert it to a string. +/// Returns a string of "error decoding packet" on failure. +fn packet_to_string(contents: Vec) -> String { + match String::from_utf8(contents) { + Ok(str) => str, + Err(_) => String::from("error decoding packet as UTF-8"), } } /// Factory for the Debug -pub struct DebugFactory { +struct DebugFactory { log: Logger, } @@ -71,7 +87,7 @@ impl DebugFactory { impl FilterFactory for DebugFactory { fn name(&self) -> &'static str { - Debug::FILTER_NAME + NAME } fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { @@ -86,27 +102,18 @@ impl FilterFactory for DebugFactory { } } -impl Filter for Debug { - fn read(&self, ctx: ReadContext) -> Option { - info!(self.log, "Read filter event"; "from" => ctx.from, "contents" => packet_to_string(ctx.contents.clone())); - Some(ctx.into()) - } - - fn write(&self, ctx: WriteContext) -> Option { - info!(self.log, "Write filter event"; "endpoint" => ctx.endpoint.address, - "from" => ctx.from, - "to" => ctx.to, - "contents" => packet_to_string(ctx.contents.clone())); - Some(ctx.into()) - } +/// A Debug filter's configuration. +#[derive(Serialize, Deserialize, Debug)] +pub struct Config { + /// Identifier that will be optionally included with each log message. + pub id: Option, } -/// packet_to_string takes the content, and attempts to convert it to a string. -/// Returns a string of "error decoding packet" on failure. -fn packet_to_string(contents: Vec) -> String { - match String::from_utf8(contents) { - Ok(str) => str, - Err(_) => String::from("error decoding packet"), +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoDebug) -> Result { + Ok(Config { id: p.id }) } } diff --git a/src/filters/factory.rs b/src/filters/factory.rs index 01901478cb..587d8d2ba8 100644 --- a/src/filters/factory.rs +++ b/src/filters/factory.rs @@ -16,12 +16,16 @@ use prometheus::Registry; -use crate::filters::{ConfigType, Error, Filter}; +use crate::{ + config::ConfigType, + filters::{Error, Filter}, +}; /// An owned pointer to a dynamic [`FilterFactory`] instance. pub type DynFilterFactory = Box; /// Provides the name and creation function for a given [`Filter`]. +/// pub trait FilterFactory: Sync + Send { /// name returns the configuration name for the Filter /// The returned string identifies the filter item's path with the following format: diff --git a/src/filters/load_balancer.rs b/src/filters/load_balancer.rs new file mode 100644 index 0000000000..0fc982030c --- /dev/null +++ b/src/filters/load_balancer.rs @@ -0,0 +1,178 @@ +/* + * Copyright 2020 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod config; +mod endpoint_chooser; + +use crate::filters::{prelude::*, DynFilterFactory}; + +use config::ProtoConfig; +use endpoint_chooser::EndpointChooser; + +pub use config::{Config, Policy}; + +pub const NAME: &str = "quilkin.extensions.filters.load_balancer.v1alpha1.LoadBalancer"; + +/// Returns a factory for creating load balancing filters. +pub fn factory() -> DynFilterFactory { + Box::from(LoadBalancerFilterFactory) +} + +/// Balances packets over the upstream endpoints. +struct LoadBalancer { + endpoint_chooser: Box, +} + +impl Filter for LoadBalancer { + fn read(&self, mut ctx: ReadContext) -> Option { + self.endpoint_chooser.choose_endpoints(&mut ctx.endpoints); + Some(ctx.into()) + } +} + +struct LoadBalancerFilterFactory; + +impl FilterFactory for LoadBalancerFilterFactory { + fn name(&self) -> &'static str { + NAME + } + + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + let config: Config = self + .require_config(args.config)? + .deserialize::(self.name())?; + + Ok(Box::new(LoadBalancer { + endpoint_chooser: config.policy.as_endpoint_chooser(), + })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::SocketAddr; + + use crate::cluster::Endpoint; + use crate::config::Endpoints; + use crate::filters::{ + load_balancer::LoadBalancerFilterFactory, CreateFilterArgs, Filter, FilterFactory, + ReadContext, + }; + use prometheus::Registry; + + fn create_filter(config: &str) -> Box { + let factory = LoadBalancerFilterFactory; + factory + .create_filter(CreateFilterArgs::fixed( + Registry::default(), + Some(&serde_yaml::from_str(config).unwrap()), + )) + .unwrap() + } + + fn get_response_addresses( + filter: &dyn Filter, + input_addresses: &[SocketAddr], + ) -> Vec { + filter + .read(ReadContext::new( + Endpoints::new( + input_addresses + .iter() + .map(|addr| Endpoint::from_address(*addr)) + .collect(), + ) + .unwrap() + .into(), + "127.0.0.1:8080".parse().unwrap(), + vec![], + )) + .unwrap() + .endpoints + .iter() + .map(|ep| ep.address) + .collect::>() + } + + #[test] + fn round_robin_load_balancer_policy() { + let addresses = vec![ + "127.0.0.1:8080".parse().unwrap(), + "127.0.0.2:8080".parse().unwrap(), + "127.0.0.3:8080".parse().unwrap(), + ]; + + let yaml = " +policy: ROUND_ROBIN +"; + let filter = create_filter(yaml); + + // Check that we repeat the same addresses in sequence forever. + let expected_sequence = addresses.iter().map(|addr| vec![*addr]).collect::>(); + + for _ in 0..10 { + assert_eq!( + expected_sequence, + (0..addresses.len()) + .map(|_| get_response_addresses(filter.as_ref(), &addresses)) + .collect::>() + ); + } + } + + #[test] + fn random_load_balancer_policy() { + let addresses = vec![ + "127.0.0.1:8080".parse().unwrap(), + "127.0.0.2:8080".parse().unwrap(), + "127.0.0.3:8080".parse().unwrap(), + ]; + + let yaml = " +policy: RANDOM +"; + let filter = create_filter(yaml); + + // Run a few selection rounds through the addresses. + let mut result_sequences = vec![]; + for _ in 0..10 { + let sequence = (0..addresses.len()) + .map(|_| get_response_addresses(filter.as_ref(), &addresses)) + .collect::>(); + result_sequences.push(sequence); + } + + // Check that every address was chosen at least once. + assert_eq!( + addresses.into_iter().collect::>(), + result_sequences + .clone() + .into_iter() + .flatten() + .flatten() + .collect::>(), + ); + + // Check that there is at least one different sequence of addresses. + assert!( + &result_sequences[1..] + .iter() + .any(|seq| seq != &result_sequences[0]), + "the same sequence of addresses were chosen for random load balancer" + ); + } +} diff --git a/src/filters/load_balancer/config.rs b/src/filters/load_balancer/config.rs new file mode 100644 index 0000000000..5ff4e12b08 --- /dev/null +++ b/src/filters/load_balancer/config.rs @@ -0,0 +1,149 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +crate::include_proto!("quilkin.extensions.filters.load_balancer.v1alpha1"); + +use std::convert::TryFrom; + +use serde::{Deserialize, Serialize}; + +use self::quilkin::extensions::filters::load_balancer::v1alpha1::load_balancer::Policy as ProtoPolicy; +use super::endpoint_chooser::{EndpointChooser, RandomEndpointChooser, RoundRobinEndpointChooser}; +use crate::{filters::ConvertProtoConfigError, map_proto_enum}; + +pub use self::quilkin::extensions::filters::load_balancer::v1alpha1::LoadBalancer as ProtoConfig; + +/// The configuration for [`load_balancer`][super]. +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[non_exhaustive] +pub struct Config { + #[serde(default)] + pub policy: Policy, +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> Result { + let policy = p + .policy + .map(|policy| { + map_proto_enum!( + value = policy.value, + field = "policy", + proto_enum_type = ProtoPolicy, + target_enum_type = Policy, + variants = [RoundRobin, Random] + ) + }) + .transpose()? + .unwrap_or_else(Policy::default); + Ok(Self { policy }) + } +} + +/// Policy represents how a [`load_balancer`][super] distributes +/// packets across endpoints. +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +pub enum Policy { + /// Send packets to endpoints in turns. + #[serde(rename = "ROUND_ROBIN")] + RoundRobin, + /// Send packets to endpoints chosen at random. + #[serde(rename = "RANDOM")] + Random, +} + +impl Policy { + pub fn as_endpoint_chooser(&self) -> Box { + match self { + Policy::RoundRobin => Box::new(RoundRobinEndpointChooser::new()), + Policy::Random => Box::new(RandomEndpointChooser), + } + } +} + +impl Default for Policy { + fn default() -> Self { + Policy::RoundRobin + } +} + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + + use super::{ + quilkin::extensions::filters::load_balancer::v1alpha1::load_balancer::{ + Policy as ProtoPolicy, PolicyValue, + }, + Config, Policy, ProtoConfig, + }; + + #[test] + fn convert_proto_config() { + let test_cases = vec![ + ( + "RandomPolicy", + ProtoConfig { + policy: Some(PolicyValue { + value: ProtoPolicy::Random as i32, + }), + }, + Some(Config { + policy: Policy::Random, + }), + ), + ( + "RoundRobinPolicy", + ProtoConfig { + policy: Some(PolicyValue { + value: ProtoPolicy::RoundRobin as i32, + }), + }, + Some(Config { + policy: Policy::RoundRobin, + }), + ), + ( + "should fail when invalid policy is provided", + ProtoConfig { + policy: Some(PolicyValue { value: 42 }), + }, + None, + ), + ( + "should use correct default values", + ProtoConfig { policy: None }, + Some(Config { + policy: Policy::default(), + }), + ), + ]; + for (name, proto_config, expected) in test_cases { + let result = Config::try_from(proto_config); + assert_eq!( + result.is_err(), + expected.is_none(), + "{}: error expectation does not match", + name + ); + if let Some(expected) = expected { + assert_eq!(expected, result.unwrap(), "{}", name); + } + } + } +} diff --git a/src/filters/load_balancer/endpoint_chooser.rs b/src/filters/load_balancer/endpoint_chooser.rs new file mode 100644 index 0000000000..ef22859223 --- /dev/null +++ b/src/filters/load_balancer/endpoint_chooser.rs @@ -0,0 +1,62 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use rand::{thread_rng, Rng}; + +use crate::config::UpstreamEndpoints; + +/// EndpointChooser chooses from a set of endpoints that a proxy is connected to. +pub trait EndpointChooser: Send + Sync { + /// choose_endpoints asks for the next endpoint(s) to use. + fn choose_endpoints(&self, endpoints: &mut UpstreamEndpoints); +} + +/// RoundRobinEndpointChooser chooses endpoints in round-robin order. +pub struct RoundRobinEndpointChooser { + next_endpoint: AtomicUsize, +} + +impl RoundRobinEndpointChooser { + pub fn new() -> Self { + RoundRobinEndpointChooser { + next_endpoint: AtomicUsize::new(0), + } + } +} + +impl EndpointChooser for RoundRobinEndpointChooser { + fn choose_endpoints(&self, endpoints: &mut UpstreamEndpoints) { + let count = self.next_endpoint.fetch_add(1, Ordering::Relaxed); + // Note: Unwrap is safe here because the index is guaranteed to be in range. + let num_endpoints = endpoints.size(); + endpoints.keep(count % num_endpoints) + .expect("BUG: unwrap should have been safe because index into endpoints list should be in range"); + } +} + +/// RandomEndpointChooser chooses endpoints in random order. +pub struct RandomEndpointChooser; + +impl EndpointChooser for RandomEndpointChooser { + fn choose_endpoints(&self, endpoints: &mut UpstreamEndpoints) { + // Note: Unwrap is safe here because the index is guaranteed to be in range. + let idx = (&mut thread_rng()).gen_range(0..endpoints.size()); + endpoints.keep(idx) + .expect("BUG: unwrap should have been safe because index into endpoints list should be in range"); + } +} diff --git a/src/filters/extensions/load_balancer/proto.rs b/src/filters/load_balancer/proto.rs similarity index 100% rename from src/filters/extensions/load_balancer/proto.rs rename to src/filters/load_balancer/proto.rs diff --git a/src/filters/extensions/local_rate_limit.rs b/src/filters/local_rate_limit.rs similarity index 88% rename from src/filters/extensions/local_rate_limit.rs rename to src/filters/local_rate_limit.rs index c08b81cc79..b303d5b180 100644 --- a/src/filters/extensions/local_rate_limit.rs +++ b/src/filters/local_rate_limit.rs @@ -32,55 +32,18 @@ mod metrics; crate::include_proto!("quilkin.extensions.filters.local_rate_limit.v1alpha1"); use self::quilkin::extensions::filters::local_rate_limit::v1alpha1::LocalRateLimit as ProtoConfig; -/// Config represents a RateLimitFilter's configuration. -#[derive(Serialize, Deserialize, Debug, PartialEq)] -struct Config { - /// max_packets is the maximum number of packets allowed - /// to be forwarded by the rate limiter in a given duration. - max_packets: usize, - /// period is the duration during which max_packets applies. - /// If none is provided, it defaults to 1 second. - #[serde(with = "humantime_serde", default = "default_period")] - period: Duration, -} +pub const NAME: &str = "quilkin.extensions.filters.local_rate_limit.v1alpha1.LocalRateLimit"; -/// default value for [`Config::period`] -fn default_period() -> Duration { - Duration::from_secs(1) -} -impl TryFrom for Config { - type Error = ConvertProtoConfigError; - - fn try_from(p: ProtoConfig) -> Result { - Ok(Self { - max_packets: p.max_packets as usize, - period: p - .period - .map(|period| { - period.try_into().map_err(|err| { - ConvertProtoConfigError::new( - format!("invalid duration: {:?}", err), - Some("period".into()), - ) - }) - }) - .transpose()? - .unwrap_or_else(default_period), - }) - } +/// Creates a new factory for generating rate limiting filters. +pub fn factory() -> DynFilterFactory { + Box::from(LocalRateLimitFactory) } -/// Creates instances of RateLimitFilter. -#[derive(Default)] -pub struct RateLimitFilterFactory; - -/// A filter that implements rate limiting on packets based on -/// the token-bucket algorithm. -/// Packets that violate the rate limit are dropped. -/// It only applies rate limiting on packets that are destined for the -/// proxy's endpoints. All other packets flow through the filter untouched. -#[crate::filter("quilkin.extensions.filters.local_rate_limit.v1alpha1.LocalRateLimit")] -struct RateLimitFilter { +/// A filter that implements rate limiting on packets based on the token-bucket +/// algorithm. Packets that violate the rate limit are dropped. It only +/// applies rate limiting on packets that are destined for the proxy's +/// endpoints. All other packets flow through the filter untouched. +struct LocalRateLimit { /// available_tokens is how many tokens are left in the bucket any /// any given moment. available_tokens: Arc, @@ -90,32 +53,8 @@ struct RateLimitFilter { shutdown_tx: Option>, } -impl FilterFactory for RateLimitFilterFactory { - fn name(&self) -> &'static str { - RateLimitFilter::FILTER_NAME - } - - fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { - let config: Config = self - .require_config(args.config)? - .deserialize::(self.name())?; - - if config.period.lt(&Duration::from_millis(100)) { - Err(Error::FieldInvalid { - field: "period".into(), - reason: "value must be at least 100ms".into(), - }) - } else { - Ok(Box::new(RateLimitFilter::new( - config, - Metrics::new(&args.metrics_registry)?, - ))) - } - } -} - -impl RateLimitFilter { - /// new returns a new RateLimitFilter. It spawns a future in the background +impl LocalRateLimit { + /// new returns a new LocalRateLimit. It spawns a future in the background /// that periodically refills the rate limiter's tokens. fn new(config: Config, metrics: Metrics) -> Self { let (shutdown_tx, mut shutdown_rx) = channel(); @@ -150,7 +89,7 @@ impl RateLimitFilter { } }); - RateLimitFilter { + LocalRateLimit { available_tokens: tokens, metrics, shutdown_tx: Some(shutdown_tx), @@ -185,7 +124,7 @@ impl RateLimitFilter { } } -impl Drop for RateLimitFilter { +impl Drop for LocalRateLimit { fn drop(&mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { shutdown_tx.send(()).ok(); @@ -193,7 +132,7 @@ impl Drop for RateLimitFilter { } } -impl Filter for RateLimitFilter { +impl Filter for LocalRateLimit { fn read(&self, ctx: ReadContext) -> Option { self.acquire_token().map(|()| ctx.into()).or_else(|| { self.metrics.packets_dropped_total.inc(); @@ -202,6 +141,73 @@ impl Filter for RateLimitFilter { } } +/// Creates instances of [`LocalRateLimit`]. +#[derive(Default)] +struct LocalRateLimitFactory; + +impl FilterFactory for LocalRateLimitFactory { + fn name(&self) -> &'static str { + NAME + } + + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + let config: Config = self + .require_config(args.config)? + .deserialize::(self.name())?; + + if config.period.lt(&Duration::from_millis(100)) { + Err(Error::FieldInvalid { + field: "period".into(), + reason: "value must be at least 100ms".into(), + }) + } else { + Ok(Box::new(LocalRateLimit::new( + config, + Metrics::new(&args.metrics_registry)?, + ))) + } + } +} + +/// Config represents a [self]'s configuration. +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct Config { + /// The maximum number of packets allowed to be forwarded by the rate + /// limiter in a given duration. + pub max_packets: usize, + /// The duration during which max_packets applies. If none is provided, it + /// defaults to one second. + #[serde(with = "humantime_serde", default = "default_period")] + pub period: Duration, +} + +/// default value for [`Config::period`] +fn default_period() -> Duration { + Duration::from_secs(1) +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> Result { + Ok(Self { + max_packets: p.max_packets as usize, + period: p + .period + .map(|period| { + period.try_into().map_err(|err| { + ConvertProtoConfigError::new( + format!("invalid duration: {:?}", err), + Some("period".into()), + ) + }) + }) + .transpose()? + .unwrap_or_else(default_period), + }) + } +} + #[cfg(test)] mod tests { use std::convert::TryFrom; @@ -214,13 +220,13 @@ mod tests { use crate::cluster::Endpoint; use crate::config::Endpoints; use crate::filters::{ - extensions::local_rate_limit::{metrics::Metrics, Config, RateLimitFilter}, + local_rate_limit::{metrics::Metrics, Config, LocalRateLimit}, Filter, ReadContext, }; use crate::test_utils::assert_write_no_change; - fn rate_limiter(config: Config) -> RateLimitFilter { - RateLimitFilter::new(config, Metrics::new(&Registry::default()).unwrap()) + fn rate_limiter(config: Config) -> LocalRateLimit { + LocalRateLimit::new(config, Metrics::new(&Registry::default()).unwrap()) } #[test] diff --git a/src/filters/extensions/local_rate_limit/metrics.rs b/src/filters/local_rate_limit/metrics.rs similarity index 100% rename from src/filters/extensions/local_rate_limit/metrics.rs rename to src/filters/local_rate_limit/metrics.rs diff --git a/src/filters/extensions/local_rate_limit/proto.rs b/src/filters/local_rate_limit/proto.rs similarity index 100% rename from src/filters/extensions/local_rate_limit/proto.rs rename to src/filters/local_rate_limit/proto.rs diff --git a/src/filters/extensions.rs b/src/filters/metadata.rs similarity index 55% rename from src/filters/extensions.rs rename to src/filters/metadata.rs index 27f6899020..f956c60af7 100644 --- a/src/filters/extensions.rs +++ b/src/filters/metadata.rs @@ -1,5 +1,5 @@ /* - * Copyright 2020 Google LLC + * Copyright 2021 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,22 +14,9 @@ * limitations under the License. */ -//! Useful filters for common operations. - -pub use capture_bytes::CaptureBytesFactory; -pub use compress::CompressFactory; -pub use concatenate_bytes::ConcatBytesFactory; -pub use debug::DebugFactory; -pub use load_balancer::LoadBalancerFilterFactory; -pub use local_rate_limit::RateLimitFilterFactory; -pub use token_router::TokenRouterFactory; - -mod capture_bytes; -mod compress; -mod concatenate_bytes; -mod debug; -mod load_balancer; -mod local_rate_limit; -mod token_router; +//! Well known dynamic metadata used by Quilkin. +/// The default key under which the [`super::capture_bytes`] filter puts the +/// byte slices it extracts from each packet. +/// - **Type** `Vec` pub const CAPTURED_BYTES: &str = "quilkin.dev/captured_bytes"; diff --git a/src/filters/set.rs b/src/filters/set.rs index c6c45b1d9d..892725c662 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -18,7 +18,7 @@ use std::iter::FromIterator; use slog::Logger; -use crate::filters::{extensions, DynFilterFactory}; +use crate::filters::{self, DynFilterFactory}; #[cfg(doc)] use crate::filters::{FilterFactory, FilterRegistry}; @@ -35,13 +35,13 @@ impl FilterSet { /// with each endpoint. /// /// Current default filters: - /// - [`Debug`][extensions::DebugFactory] - /// - [`LocalRateLimit`][extensions::RateLimitFilterFactory] - /// - [`ConcatBytes`][extensions::ConcatBytesFactory] - /// - [`LoadBalancer`][extensions::LoadBalancerFilterFactory] - /// - [`CaptureBytes`][extensions::CaptureBytesFactory] - /// - [`TokenRouter`][extensions::TokenRouterFactory] - /// - [`Compress`][extensions::CompressFactory] + /// - [`debug`][filters::debug] + /// - [`local_rate_limit`][filters::local_rate_limit] + /// - [`concatenate_bytes`][filters::concatenate_bytes] + /// - [`load_balancer`][filters::load_balancer] + /// - [`capture_bytes`][filters::capture_bytes] + /// - [`token_router`][filters::token_router] + /// - [`compress`][filters::compress] pub fn default(base: &Logger) -> Self { Self::default_with(base, Option::into_iter(None)) } @@ -57,13 +57,13 @@ impl FilterSet { ) -> Self { Self::with( std::array::IntoIter::new([ - Box::from(extensions::DebugFactory::new(base)) as DynFilterFactory, - Box::from(extensions::RateLimitFilterFactory::default()), - Box::from(extensions::ConcatBytesFactory::default()), - Box::from(extensions::LoadBalancerFilterFactory::default()), - Box::from(extensions::CaptureBytesFactory::new(base)), - Box::from(extensions::TokenRouterFactory::new(base)), - Box::from(extensions::CompressFactory::new(base)), + filters::debug::factory(base), + filters::local_rate_limit::factory(), + filters::concatenate_bytes::factory(), + filters::load_balancer::factory(), + filters::capture_bytes::factory(base), + filters::token_router::factory(base), + filters::compress::factory(base), ]) .chain(filters), ) diff --git a/src/filters/extensions/token_router.rs b/src/filters/token_router.rs similarity index 95% rename from src/filters/extensions/token_router.rs rename to src/filters/token_router.rs index 141ee2bd97..b72e877acf 100644 --- a/src/filters/extensions/token_router.rs +++ b/src/filters/token_router.rs @@ -26,56 +26,40 @@ use slog::{error, o, Logger}; use crate::{ config::{RetainedItems, LOG_SAMPLING_RATE}, - filters::{ - extensions::{token_router::metrics::Metrics, CAPTURED_BYTES}, - prelude::*, - }, + filters::{metadata::CAPTURED_BYTES, prelude::*}, }; -use self::quilkin::extensions::filters::token_router::v1alpha1::TokenRouter as ProtoConfig; - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -#[serde(default)] -struct Config { - /// the key to use when retrieving the token from the Filter's dynamic metadata - #[serde(rename = "metadataKey", default = "default_metadata_key")] - metadata_key: String, -} +use metrics::Metrics; -/// Default value for [`Config::metadata_key`] -fn default_metadata_key() -> String { - CAPTURED_BYTES.into() -} - -impl Default for Config { - fn default() -> Self { - Self { - metadata_key: default_metadata_key(), - } - } -} +use self::quilkin::extensions::filters::token_router::v1alpha1::TokenRouter as ProtoConfig; -impl TryFrom for Config { - type Error = ConvertProtoConfigError; +pub const NAME: &str = "quilkin.extensions.filters.token_router.v1alpha1.TokenRouter"; - fn try_from(p: ProtoConfig) -> Result { - Ok(Self { - metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), - }) - } +/// Returns a factory for creating token routing filters. +pub fn factory(base: &Logger) -> DynFilterFactory { + Box::from(TokenRouterFactory::new(base)) } /// Filter that only allows packets to be passed to Endpoints that have a matching /// connection_id to the token stored in the Filter's dynamic metadata. -#[crate::filter("quilkin.extensions.filters.token_router.v1alpha1.TokenRouter")] struct TokenRouter { log: Logger, metadata_key: Arc, metrics: Metrics, } +impl TokenRouter { + fn new(base: &Logger, config: Config, metrics: Metrics) -> Self { + Self { + log: base.new(o!("source" => "extensions::TokenRouter")), + metadata_key: Arc::new(config.metadata_key), + metrics, + } + } +} + /// Factory for the TokenRouter filter -pub struct TokenRouterFactory { +struct TokenRouterFactory { log: Logger, } @@ -87,7 +71,7 @@ impl TokenRouterFactory { impl FilterFactory for TokenRouterFactory { fn name(&self) -> &'static str { - TokenRouter::FILTER_NAME + NAME } fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { @@ -105,16 +89,6 @@ impl FilterFactory for TokenRouterFactory { } } -impl TokenRouter { - fn new(base: &Logger, config: Config, metrics: Metrics) -> Self { - Self { - log: base.new(o!("source" => "extensions::TokenRouter")), - metadata_key: Arc::new(config.metadata_key), - metrics, - } - } -} - impl Filter for TokenRouter { fn read(&self, mut ctx: ReadContext) -> Option { match ctx.metadata.get(self.metadata_key.as_ref()) { @@ -159,6 +133,37 @@ impl Filter for TokenRouter { } } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(default)] +pub struct Config { + /// the key to use when retrieving the token from the Filter's dynamic metadata + #[serde(rename = "metadataKey", default = "default_metadata_key")] + pub metadata_key: String, +} + +/// Default value for [`Config::metadata_key`] +fn default_metadata_key() -> String { + CAPTURED_BYTES.into() +} + +impl Default for Config { + fn default() -> Self { + Self { + metadata_key: default_metadata_key(), + } + } +} + +impl TryFrom for Config { + type Error = ConvertProtoConfigError; + + fn try_from(p: ProtoConfig) -> Result { + Ok(Self { + metadata_key: p.metadata_key.unwrap_or_else(default_metadata_key), + }) + } +} + #[cfg(test)] mod tests { use std::convert::TryFrom; @@ -176,7 +181,7 @@ mod tests { }; use crate::cluster::Endpoint; use crate::filters::{ - extensions::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext, + metadata::CAPTURED_BYTES, CreateFilterArgs, Filter, FilterFactory, ReadContext, }; const TOKEN_KEY: &str = "TOKEN"; diff --git a/src/filters/extensions/token_router/metrics.rs b/src/filters/token_router/metrics.rs similarity index 100% rename from src/filters/extensions/token_router/metrics.rs rename to src/filters/token_router/metrics.rs diff --git a/src/filters/extensions/token_router/proto.rs b/src/filters/token_router/proto.rs similarity index 100% rename from src/filters/extensions/token_router/proto.rs rename to src/filters/token_router/proto.rs diff --git a/src/lib.rs b/src/lib.rs index 22902de9f8..cacb9653ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ pub mod test_utils; pub(crate) mod utils; pub(crate) mod xds; -pub use quilkin_macros::{filter, include_proto}; +pub use quilkin_macros::include_proto; /// Run tests in our external documentation. This is only available in /// nightly at the moment, but is stable on nightly and will be available in diff --git a/src/test_utils.rs b/src/test_utils.rs index 13b831df75..95956b969d 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -26,7 +26,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::cluster::Endpoint; use crate::config::{Builder as ConfigBuilder, Config, EndPoint, Endpoints}; -use crate::filters::{prelude::*, DynFilterFactory, FilterChain, FilterRegistry, FilterSet}; +use crate::filters::{prelude::*, FilterChain, FilterRegistry, FilterSet}; use crate::proxy::{Builder, PendingValidation}; pub struct TestFilterFactory {} diff --git a/tests/compress.rs b/tests/compress.rs index 6f14309a75..e3d6d37661 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -22,7 +22,7 @@ mod tests { use tokio::time::{timeout, Duration}; use quilkin::config::{Builder, EndPoint, Filter}; - use quilkin::filters::{extensions::CompressFactory, FilterFactory}; + use quilkin::filters::compress; use quilkin::test_utils::{logger, TestHelper}; #[tokio::test] @@ -41,7 +41,7 @@ on_write: COMPRESS .with_port(server_port) .with_static( vec![Filter { - name: CompressFactory::new(&log).name().into(), + name: compress::factory(&log).name().into(), config: serde_yaml::from_str(yaml).unwrap(), }], vec![EndPoint::new(echo)], @@ -60,7 +60,7 @@ on_write: DECOMPRESS .with_port(client_port) .with_static( vec![Filter { - name: CompressFactory::new(&log).name().into(), + name: compress::factory(&log).name().into(), config: serde_yaml::from_str(yaml).unwrap(), }], vec![EndPoint::new( diff --git a/tests/concatenate_bytes.rs b/tests/concatenate_bytes.rs index 414a53fe23..695931da48 100644 --- a/tests/concatenate_bytes.rs +++ b/tests/concatenate_bytes.rs @@ -21,7 +21,7 @@ mod tests { use tokio::time::{timeout, Duration}; use quilkin::config::{Builder, EndPoint, Filter}; - use quilkin::filters::{extensions::ConcatBytesFactory, FilterFactory}; + use quilkin::filters::concatenate_bytes; use quilkin::test_utils::TestHelper; #[tokio::test] @@ -38,7 +38,7 @@ bytes: YWJj #abc .with_port(server_port) .with_static( vec![Filter { - name: ConcatBytesFactory::default().name().into(), + name: concatenate_bytes::factory().name().into(), config: serde_yaml::from_str(yaml).unwrap(), }], vec![EndPoint::new(echo)], diff --git a/tests/filter_order.rs b/tests/filter_order.rs index 8105dc9a4f..99fc868f6f 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -24,10 +24,7 @@ mod tests { use tokio::time::{timeout, Duration}; use quilkin::config::{Builder, EndPoint, Filter}; - use quilkin::filters::{ - extensions::{CompressFactory, ConcatBytesFactory}, - FilterFactory, - }; + use quilkin::filters::{compress, concatenate_bytes}; use quilkin::test_utils::TestHelper; #[tokio::test] @@ -64,15 +61,15 @@ on_write: DECOMPRESS .with_static( vec![ Filter { - name: ConcatBytesFactory::default().name().into(), + name: concatenate_bytes::factory().name().into(), config: serde_yaml::from_str(yaml_concat_read).unwrap(), }, Filter { - name: ConcatBytesFactory::default().name().into(), + name: concatenate_bytes::factory().name().into(), config: serde_yaml::from_str(yaml_concat_write).unwrap(), }, Filter { - name: CompressFactory::new(&t.log).name().into(), + name: compress::factory(&t.log).name().into(), config: serde_yaml::from_str(yaml_compress).unwrap(), }, ], diff --git a/tests/filters.rs b/tests/filters.rs index 9823248ad8..f229e12357 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -24,7 +24,7 @@ mod tests { use slog::info; use quilkin::config::{Builder as ConfigBuilder, EndPoint, Filter}; - use quilkin::filters::{extensions::DebugFactory, FilterFactory}; + use quilkin::filters::debug; use quilkin::proxy::Builder as ProxyBuilder; use quilkin::test_utils::{new_registry, TestHelper}; use std::sync::Arc; @@ -112,7 +112,7 @@ mod tests { let mut t = TestHelper::default(); // handy for grabbing the configuration name - let factory = DebugFactory::new(&t.log); + let factory = debug::factory(&t.log); // create an echo server as an endpoint. let echo = t.run_echo_server().await; diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index 23c77ded86..6cc52aea7a 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -20,7 +20,7 @@ mod tests { use std::sync::{Arc, Mutex}; use quilkin::config::{Builder as ConfigBuilder, EndPoint, Filter}; - use quilkin::filters::{extensions::LoadBalancerFilterFactory, FilterFactory}; + use quilkin::filters::load_balancer; use quilkin::test_utils::TestHelper; #[tokio::test] @@ -48,7 +48,7 @@ policy: ROUND_ROBIN .with_port(server_port) .with_static( vec![Filter { - name: LoadBalancerFilterFactory::default().name().into(), + name: load_balancer::factory().name().into(), config: serde_yaml::from_str(yaml).unwrap(), }], echo_addresses diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index 7ad400ba98..4a528decde 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -23,7 +23,7 @@ mod tests { use tokio::time::{timeout, Duration}; use quilkin::config::{Builder as ConfigBuilder, EndPoint, Filter}; - use quilkin::filters::{extensions::RateLimitFilterFactory, FilterFactory}; + use quilkin::filters::local_rate_limit; use quilkin::test_utils::TestHelper; #[tokio::test] @@ -41,7 +41,7 @@ period: 1s .with_port(server_port) .with_static( vec![Filter { - name: RateLimitFilterFactory::default().name().into(), + name: local_rate_limit::factory().name().into(), config: serde_yaml::from_str(yaml).unwrap(), }], vec![EndPoint::new(echo)], diff --git a/tests/token_router.rs b/tests/token_router.rs index 290db54467..7dbdeb5515 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -21,10 +21,7 @@ mod tests { use tokio::time::{timeout, Duration}; use quilkin::config::{Builder, EndPoint, Filter}; - use quilkin::filters::{ - extensions::{CaptureBytesFactory, TokenRouterFactory}, - FilterFactory, - }; + use quilkin::filters::{capture_bytes, token_router}; use quilkin::test_utils::{logger, TestHelper}; /// This test covers both token_router and capture_bytes filters, @@ -50,11 +47,11 @@ quilkin.dev: .with_static( vec![ Filter { - name: CaptureBytesFactory::new(&log).name().into(), + name: capture_bytes::factory(&log).name().into(), config: serde_yaml::from_str(capture_yaml).unwrap(), }, Filter { - name: TokenRouterFactory::new(&log).name().into(), + name: token_router::factory(&log).name().into(), config: None, }, ],