From 08e31228d011e5d8f274a4276215528339668542 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Wed, 8 Nov 2023 10:37:36 -0800 Subject: [PATCH] feat(http_server source): allow wildcard matching in headers --- src/sources/http_server.rs | 91 ++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index d1553ae6944d09..b2c667e31a5fea 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -6,14 +6,14 @@ use http::{StatusCode, Uri}; use http_serde; use tokio_util::codec::Decoder as _; use vrl::value::{kind::Collection, Kind}; -use warp::http::HeaderMap; +use warp::http::{ HeaderMap, HeaderValue}; use codecs::{ decoding::{DeserializerConfig, FramingConfig}, BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig, NewlineDelimitedDecoderConfig, }; -use lookup::{lookup_v2::OptionalValuePath, OwnedValuePath, owned_value_path, path}; +use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; use vector_config::configurable_component; use vector_core::{ config::{DataType, LegacyKey, LogNamespace}, @@ -322,6 +322,26 @@ fn remove_duplicates(mut list: Vec, list_name: &str) -> Vec { } list } +/// Removes glob matches from the list, and logs a `warn!()` for each. +fn remove_glob_matches(list: &Vec, patterns: &Vec, list_name: &str) -> Vec { + + let mut ret = Vec::new(); + + for pattern in patterns.iter() { + for name in list.iter() { + if pattern.matches(name) { + warn!( + "`{}` configuration contains specific entry `{}` that matches wildcard entry `{}`. Removing.", + list_name, name, pattern.as_str() + ); + } else { + ret.push(name.clone()); + } + } + } + + ret +} #[async_trait::async_trait] #[typetag::serde(name = "http_server")] @@ -330,8 +350,23 @@ impl SourceConfig for SimpleHttpConfig { let decoder = self.get_decoding_config()?.build()?; let log_namespace = cx.log_namespace(self.log_namespace); + let (glob_headers, mut headers): (Vec, Vec) = self.headers + .clone() + .into_iter() + .partition(|s| s.contains('*')); + + let glob_headers: Vec = remove_duplicates(glob_headers, "headers") + .into_iter() + .map(|s| glob::Pattern::new(&s).unwrap()) + .collect(); + + if !glob_headers.is_empty() { + headers = remove_glob_matches(&headers, &glob_headers, "headers"); + } + let source = SimpleHttpSource { - headers: remove_duplicates(self.headers.clone(), "headers"), + headers: remove_duplicates(headers, "headers"), + glob_headers: glob_headers.clone(), query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"), path_key: self.path_key.clone(), decoder, @@ -378,6 +413,7 @@ impl SourceConfig for SimpleHttpConfig { #[derive(Clone)] struct SimpleHttpSource { headers: Vec, + glob_headers: Vec, query_parameters: Vec, path_key: OptionalValuePath, decoder: Decoder, @@ -407,29 +443,36 @@ impl HttpSource for SimpleHttpSource { request_path.to_owned(), ); - // add each header to each event - let mut received_headers = headers_config.clone(); - for header_name in &self.headers { - if let Some(header_value) = received_headers.remove(header_name) { - self.log_namespace.insert_source_metadata( - SimpleHttpConfig::NAME, - log, - Some(LegacyKey::InsertIfEmpty(path!(header_name))), - path!("headers", header_name), - Value::from(header_value.as_bytes()), - ); + // add any matching wildcard headers to each event + for header_pattern in &self.glob_headers { + for header_name in headers_config.keys() { + if header_pattern.matches(header_name.as_str()) { + + let value = headers_config.get(header_name).map(HeaderValue::as_bytes); + + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + Some(LegacyKey::InsertIfEmpty(path!(header_name.as_str()))), + path!("headers", header_name.as_str()), + Value::from(value.map(Bytes::copy_from_slice)), + ); + } } } - if self.log_namespace == LogNamespace::Vector { - for (header_name, value) in received_headers.iter() { - self.log_namespace.insert_source_metadata( - SimpleHttpConfig::NAME, - log, - None::>, - path!("headers", header_name.as_str()), - Value::from(value.as_bytes()), - ); - } + + // add each specific header to each event + for header_name in &self.headers { + let value = headers_config.get(header_name).map(HeaderValue::as_bytes); + + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + Some(LegacyKey::InsertIfEmpty(path!(header_name))), + path!("headers", header_name), + Value::from(value.map(Bytes::copy_from_slice)), + ); + } self.log_namespace.insert_standard_vector_source_metadata(