Skip to content

Commit

Permalink
feat(http_server source): allow wildcard matching in headers
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnens committed Nov 8, 2023
1 parent b4d120d commit 08e3122
Showing 1 changed file with 67 additions and 24 deletions.
91 changes: 67 additions & 24 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use http::{StatusCode, Uri};
use http_serde;
use tokio_util::codec::Decoder as _;
use vrl::value::{kind::Collection, Kind};
use warp::http::HeaderMap;
use warp::http::{ HeaderMap, HeaderValue};

use codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
NewlineDelimitedDecoderConfig,
};
use lookup::{lookup_v2::OptionalValuePath, OwnedValuePath, owned_value_path, path};
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use vector_config::configurable_component;
use vector_core::{
config::{DataType, LegacyKey, LogNamespace},
Expand Down Expand Up @@ -322,6 +322,26 @@ fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
}
list
}
/// Removes glob matches from the list, and logs a `warn!()` for each.
fn remove_glob_matches(list: &Vec<String>, patterns: &Vec<glob::Pattern>, list_name: &str) -> Vec<String> {

let mut ret = Vec::new();

for pattern in patterns.iter() {
for name in list.iter() {
if pattern.matches(name) {
warn!(
"`{}` configuration contains specific entry `{}` that matches wildcard entry `{}`. Removing.",
list_name, name, pattern.as_str()
);
} else {
ret.push(name.clone());
}
}
}

ret
}

#[async_trait::async_trait]
#[typetag::serde(name = "http_server")]
Expand All @@ -330,8 +350,23 @@ impl SourceConfig for SimpleHttpConfig {
let decoder = self.get_decoding_config()?.build()?;
let log_namespace = cx.log_namespace(self.log_namespace);

let (glob_headers, mut headers): (Vec<String>, Vec<String>) = self.headers
.clone()
.into_iter()
.partition(|s| s.contains('*'));

let glob_headers: Vec<glob::Pattern> = remove_duplicates(glob_headers, "headers")
.into_iter()
.map(|s| glob::Pattern::new(&s).unwrap())
.collect();

if !glob_headers.is_empty() {
headers = remove_glob_matches(&headers, &glob_headers, "headers");
}

let source = SimpleHttpSource {
headers: remove_duplicates(self.headers.clone(), "headers"),
headers: remove_duplicates(headers, "headers"),
glob_headers: glob_headers.clone(),
query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"),
path_key: self.path_key.clone(),
decoder,
Expand Down Expand Up @@ -378,6 +413,7 @@ impl SourceConfig for SimpleHttpConfig {
#[derive(Clone)]
struct SimpleHttpSource {
headers: Vec<String>,
glob_headers: Vec<glob::Pattern>,
query_parameters: Vec<String>,
path_key: OptionalValuePath,
decoder: Decoder,
Expand Down Expand Up @@ -407,29 +443,36 @@ impl HttpSource for SimpleHttpSource {
request_path.to_owned(),
);

// add each header to each event
let mut received_headers = headers_config.clone();
for header_name in &self.headers {
if let Some(header_value) = received_headers.remove(header_name) {
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(header_value.as_bytes()),
);
// add any matching wildcard headers to each event
for header_pattern in &self.glob_headers {
for header_name in headers_config.keys() {
if header_pattern.matches(header_name.as_str()) {

let value = headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name.as_str()))),
path!("headers", header_name.as_str()),
Value::from(value.map(Bytes::copy_from_slice)),
);
}
}
}
if self.log_namespace == LogNamespace::Vector {
for (header_name, value) in received_headers.iter() {
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
None::<LegacyKey<&OwnedValuePath>>,
path!("headers", header_name.as_str()),
Value::from(value.as_bytes()),
);
}

// add each specific header to each event
for header_name in &self.headers {
let value = headers_config.get(header_name).map(HeaderValue::as_bytes);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(header_name))),
path!("headers", header_name),
Value::from(value.map(Bytes::copy_from_slice)),
);

}

self.log_namespace.insert_standard_vector_source_metadata(
Expand Down

0 comments on commit 08e3122

Please sign in to comment.