diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 8b5cfefcf57ae..d1553ae6944d0 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -6,14 +6,14 @@ use http::{StatusCode, Uri}; use http_serde; use tokio_util::codec::Decoder as _; use vrl::value::{kind::Collection, Kind}; -use warp::http::{HeaderMap, HeaderValue}; +use warp::http::HeaderMap; use codecs::{ decoding::{DeserializerConfig, FramingConfig}, BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig, NewlineDelimitedDecoderConfig, }; -use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; +use lookup::{lookup_v2::OptionalValuePath, OwnedValuePath, owned_value_path, path}; use vector_config::configurable_component; use vector_core::{ config::{DataType, LegacyKey, LogNamespace}, @@ -408,16 +408,28 @@ impl HttpSource for SimpleHttpSource { ); // add each header to each event + let mut received_headers = headers_config.clone(); for header_name in &self.headers { - let value = headers_config.get(header_name).map(HeaderValue::as_bytes); - - self.log_namespace.insert_source_metadata( - SimpleHttpConfig::NAME, - log, - Some(LegacyKey::InsertIfEmpty(path!(header_name))), - path!("headers", header_name), - Value::from(value.map(Bytes::copy_from_slice)), - ); + if let Some(header_value) = received_headers.remove(header_name) { + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + Some(LegacyKey::InsertIfEmpty(path!(header_name))), + path!("headers", header_name), + Value::from(header_value.as_bytes()), + ); + } + } + if self.log_namespace == LogNamespace::Vector { + for (header_name, value) in received_headers.iter() { + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + None::>, + path!("headers", header_name.as_str()), + Value::from(value.as_bytes()), + ); + } } self.log_namespace.insert_standard_vector_source_metadata(